Zookeeper 客戶端 Curator

Zookeeper 客戶端 Curator

概述

CuratorNetflix公司開源的一套Zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊WatcherNodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。

Curator包含了幾個包:

  • curator-framework:對Zookeeper的底層API的一些封裝。
  • curator-client:提供一些客戶端的操作,例如重試策略等。
  • curator-recipes:封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計數(shù)器、分布式Barrier等。

客戶端

創(chuàng)建

靜態(tài)方法創(chuàng)建

// 重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

Fluent API 創(chuàng)建

// 重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
        .connectString(connectionInfo)
        .sessionTimeoutMs(5000)
        .connectionTimeoutMs(5000)
        .retryPolicy(retryPolicy)
        .build();

創(chuàng)建包含命名空間的客戶端

// 重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 命名空間,即該客戶端可操作目錄的根目錄
                .build();

啟動

client.start();

節(jié)點

Zookeeper 節(jié)點類型

  • PERSISTENT: 持久化
  • PERSISTENT_SEQUENTIAL: 持久化并且?guī)蛄刑?/li>
  • EPHEMERAL: 臨時
  • EPHEMERAL_SEQUENTIAL: 臨時并且?guī)蛄刑?/li>

創(chuàng)建

// 創(chuàng)建節(jié)點,內(nèi)容為空
client.create().forPath("path");
// 創(chuàng)建節(jié)點,內(nèi)容為“init”
client.create().forPath("path","init".getBytes());
// 創(chuàng)建臨時節(jié)點,內(nèi)容為空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
// 創(chuàng)建臨時節(jié)點,內(nèi)容為“init”
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
// 創(chuàng)建臨時節(jié)點,內(nèi)容為“init”,遞歸創(chuàng)建父節(jié)點
client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

刪除

// 刪除節(jié)點
client.delete().forPath("path");
// 刪除節(jié)點,遞歸刪除子節(jié)點
client.delete().deletingChildrenIfNeeded().forPath("path");
// 刪除制定版本節(jié)點
client.delete().withVersion(10086).forPath("path");
// 強制刪除節(jié)點
client.delete().guaranteed().forPath("path");

讀取

// 讀取節(jié)點內(nèi)容
client.getData().forPath("path");
// 讀取節(jié)點內(nèi)容及狀態(tài)
client.getData().storingStatIn(stat).forPath("path");

更新

// 更新節(jié)點
client.setData().forPath("path","data".getBytes());
// 指定版本更新節(jié)點
client.setData().withVersion(10086).forPath("path","data".getBytes());

節(jié)點是否存在

client.checkExists().forPath("path");

獲取所有子節(jié)點

client.getChildren().forPath("path");

事務(wù)

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

異步接口

Curator提供異步接口,引入了BackgroundCallback接口用于處理異步接口調(diào)用之后服務(wù)端返回的結(jié)果信息。BackgroundCallback接口中一個重要的回調(diào)值為CuratorEvent,里面包含事件類型、響應(yīng)嗎和節(jié)點的詳細(xì)信息。

CuratorEventType

事件類型 對應(yīng)CuratorFramework實例的方法
CREATE create()
DELETE delete()
EXISTS checkExists()
GET_DATA getData()
SET_DATA setData()
CHILDREN getChildren()
SYNC sync(String,Object)
GET_ACL getACL()
SET_ACL setACL()
WATCHED Watcher(Watcher)
CLOSING close()

響應(yīng)碼

響應(yīng)碼 意義
0 OK,即調(diào)用成功
-4 ConnectionLoss,即客戶端與服務(wù)端斷開連接
-110 NodeExists,即節(jié)點已經(jīng)存在
-112 SessionExpired,即會話過期
Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)  //如果inBackground()方法不指定executor,那么會默認(rèn)使用Curator的EventThread去進(jìn)行異步處理
      .forPath("path");

緩存

強烈推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),當(dāng)連接狀態(tài)為LOST,curator-recipes下的所有Api將會失效或者過期,盡管后面所有的例子都沒有使用到ConnectionStateListener。

Zookeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是開發(fā)者需要反復(fù)注冊(Watcher只能單次注冊單次使用)。Cache是Curator中對事件監(jiān)聽的包裝,可以看作是對事件監(jiān)聽的本地緩存視圖,能夠自動為開發(fā)者處理反復(fù)注冊監(jiān)聽。Curator提供了三種Watcher(Cache)來監(jiān)聽結(jié)點的變化。

Path Cache

Path Cache用來監(jiān)控一個ZNode的子節(jié)點。當(dāng)一個子節(jié)點增加,更新,刪除時,Path Cache會改變它的狀態(tài), 會包含最新的子節(jié)點,子節(jié)點的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過PathChildrenCacheListener通知。

實際使用時會涉及到四個類:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必須調(diào)用它的start方法,使用完后調(diào)用close方法。 可以設(shè)置StartMode來實現(xiàn)啟動的模式。

StartMode有下面幾種:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在調(diào)用start()之前會調(diào)用rebuild()
  3. POST_INITIALIZED_EVENT: 當(dāng)Cache初始化數(shù)據(jù)后發(fā)送一個PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener) 可以增加listener監(jiān)聽緩存的變化。

getCurrentData()方法返回一個List<ChildData>對象,可以遍歷所有的子節(jié)點。

設(shè)置/更新、移除其實是使用client (CuratorFramework)來操作, 不通過PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {

        // 創(chuàng)建客戶端
        String connectInfo = "localhost:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

        // 啟動
        client.start();

        // cache
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        // cache start
        cache.start();
        // cache listener
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件類型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("節(jié)點數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);

        // test
        client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());
        ThreadUtil.sleep(10);
        client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());
        ThreadUtil.sleep(10);
        client.setData().forPath(PATH + "/test01", "01_V2".getBytes());
        ThreadUtil.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath(PATH + "/test01");
        ThreadUtil.sleep(10);
        client.delete().forPath(PATH + "/test02");
        ThreadUtil.sleep(10);
        
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如果new PathChildrenCache(client, PATH, true)中的參數(shù)cacheData值設(shè)置為false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會緩存節(jié)點數(shù)據(jù)。

注意:示例中的Thread.sleep(10)可以注釋掉,但是注釋后事件監(jiān)聽的觸發(fā)次數(shù)會不全,這可能與PathCache的實現(xiàn)原理有關(guān),不能太過頻繁的觸發(fā)事件!

Node Cache

Node Cache與Path Cache類似,Node Cache只是監(jiān)聽某一個特定的節(jié)點。它涉及到下面的三個類:

  • NodeCache - Node Cache實現(xiàn)類
  • NodeCacheListener - 節(jié)點監(jiān)聽器
  • ChildData - 節(jié)點數(shù)據(jù)

注意:使用cache,依然要調(diào)用它的start()方法,使用完后調(diào)用close()方法。

getCurrentData()將得到節(jié)點當(dāng)前的狀態(tài),通過它的狀態(tài)可以得到當(dāng)前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/nodeCache";

    public static void main(String[] args) throws Exception {

        // 創(chuàng)建客戶端
        String connectInfo = "localhost:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

        // 啟動
        client.start();

        client.create().creatingParentsIfNeeded().forPath(PATH);

        // cache
        NodeCache cache = new NodeCache(client, PATH);
        // cache start
        cache.start();
        // cache listener
        NodeCacheListener cacheListener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("節(jié)點數(shù)據(jù):" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("節(jié)點被刪除!");
            }
        };
        cache.getListenable().addListener(cacheListener);

        // test
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(10);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(10);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(10);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示例中的Thread.sleep(10)可以注釋,但是注釋后事件監(jiān)聽的觸發(fā)次數(shù)會不全,這可能與NodeCache的實現(xiàn)原理有關(guān),不能太過頻繁的觸發(fā)事件!

注意:NodeCache只能監(jiān)聽一個節(jié)點的狀態(tài)變化。

Tree Cache

Tree Cache可以監(jiān)控整個樹上的所有節(jié)點,類似于PathCache和NodeCache的組合,主要涉及到下面四個類:

  • TreeCache - Tree Cache實現(xiàn)類
  • TreeCacheListener - 監(jiān)聽器類
  • TreeCacheEvent - 觸發(fā)的事件類
  • ChildData - 節(jié)點數(shù)據(jù)
public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {

        // 創(chuàng)建客戶端
        String connectInfo = "localhost:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .build();

        // 啟動
        client.start();

        client.create().creatingParentsIfNeeded().forPath(PATH);

        // cache
        TreeCache cache = new TreeCache(client, PATH);
        // cache start
        cache.start();
        // cache listener
        TreeCacheListener cacheListener = (client1, event) -> System.out.println("事件類型:" + event.getType() +
                " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null) +
                " | 數(shù)據(jù):" + (null != event.getData() ? new String(event.getData().getData()) : null));

        cache.getListenable().addListener(cacheListener);

        // test
        client.setData().forPath(PATH, "01".getBytes());
//        Thread.sleep(10);
        client.setData().forPath(PATH, "02".getBytes());
//        Thread.sleep(10);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
//        Thread.sleep(10);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中沒有使用Thread.sleep(10),但是事件觸發(fā)次數(shù)也是正常的。

注意:TreeCache在初始化(調(diào)用start()方法)的時候會回調(diào)TreeCacheListener實例一個事TreeCacheEvent,而回調(diào)的TreeCacheEvent對象的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導(dǎo)致空指針異常,這里應(yīng)該主動處理并避免這種情況。

選舉

分布式鎖

分布式隊列

分布式計數(shù)器

參考

zookeeper-curator-usage
curator.apache.org

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容