ZooKeeper開(kāi)源客戶端Curator的使用

Curator簡(jiǎn)介

Curator是Netllix公司開(kāi)源的一套Zookeeper客戶端框架,Curator解決了許多Zookeeper客戶端非常底層的細(xì)節(jié)開(kāi)發(fā)工作,包括連接重連、反復(fù)注冊(cè)Watcher和NodeExistsException異常等,目前已經(jīng)成為了Apache的頂級(jí)項(xiàng)目,是全世界范圍內(nèi)使用最廣泛的Zookeeper客戶端之一。
除了封裝一些開(kāi)發(fā)人員不需要特別關(guān)注的底層細(xì)節(jié)之外,Curator還在Zookeeper客戶端原生API的基礎(chǔ)上進(jìn)行了包裝,提供了一套易用性和可讀性更強(qiáng)的Fluent風(fēng)格的客戶端API框架。
除此之外,Curator還提供了Zookeeper各種應(yīng)用場(chǎng)景(Recipe,如共享服務(wù),Master選舉機(jī)制和分布式計(jì)數(shù)器等)。

API簡(jiǎn)單使用

使用Curator的API,首先需要引入Curator的Maven依賴:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.13.0</version>
</dependency>

使用Curator創(chuàng)建會(huì)話

public class CreateSession {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建重試策略
        // 參數(shù)1:baseSleepTimeMs 初始sleep時(shí)間
        // 參數(shù)2:maxRetries 最大重試次數(shù)
        // 參數(shù)3:maxSleepMs 最大sleep時(shí)間(這里沒(méi)有使用到該參數(shù))
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 參數(shù)1:connectString
        // 參數(shù)2:sessionTimeoutMs
        // 參數(shù)3:connectionTimeoutMs
        // 參數(shù)4:retryPolicy
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                "192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181", 5000, 300, retryPolicy);

        client.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

使用Fluent風(fēng)格的API接口來(lái)創(chuàng)建會(huì)話

public class FluentCreateSession {

    public static void main(String[] args) throws Exception {

        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(300)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 定義獨(dú)立命名空間為/base,該客戶端對(duì)于Zookeeper上的數(shù)據(jù)節(jié)點(diǎn)進(jìn)行操作都是基于/base節(jié)點(diǎn)進(jìn)行的,從而實(shí)現(xiàn)不同業(yè)務(wù)的隔離
                .build();

        client.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

創(chuàng)建節(jié)點(diǎn)

public class CreateNode {

    private static String path = "/zk-book/c1";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()  //自動(dòng)遞歸創(chuàng)建父節(jié)點(diǎn),父節(jié)點(diǎn)都是持久節(jié)點(diǎn)
                .withMode(CreateMode.EPHEMERAL)  //節(jié)點(diǎn)屬性:臨時(shí)節(jié)點(diǎn),默認(rèn)為持久節(jié)點(diǎn)
                .forPath(path, "init".getBytes());  //創(chuàng)建一個(gè)節(jié)點(diǎn),附帶初始內(nèi)容
    }
}

刪除節(jié)點(diǎn)

public class DeleteNode {

    private static String path = "/zk-book";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        // 讀取剛創(chuàng)建的節(jié)點(diǎn)數(shù)據(jù)
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);

        client.delete()
                //.guaranteed() // 強(qiáng)制刪除,只要客戶端會(huì)話優(yōu)效,在后臺(tái)反復(fù)重試,直到節(jié)點(diǎn)刪除成功
                .deletingChildrenIfNeeded()
                .withVersion(stat.getVersion()) //刪除指定版本的節(jié)點(diǎn)
                .forPath(path);
    }
}

讀取數(shù)據(jù)

public class GetNodeData {

    private static String path = "/zk-book/c1";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        // 通過(guò)一個(gè)Stat變量存儲(chǔ)服務(wù)端返回的最新的節(jié)點(diǎn)狀態(tài)信息
        Stat stat = new Stat();
        // 讀取一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容,并且獲取到該節(jié)點(diǎn)的stat
        byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
        System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + new String(bytes));
    }
}

更新數(shù)據(jù)

public class SetNodeData {

    private static String path = "/zk-book/c1";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);

        // 第一次更新成功
        System.out.println("Success set node for: " + path + " new version: " +
                client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());

        // 第二次更新失敗
        try {
            client.setData().withVersion(stat.getVersion()).forPath(path);
        } catch (Exception e) {
            System.out.println("Failed set node due to " + e.getMessage());
        }
    }
}

異步接口

public class AsyncCreateNode {

    private static String path = "/zk-book/asynccreatenode";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    private static CountDownLatch semaphore = new CountDownLatch(2);

    private static ExecutorService executorService = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {
        client.start();

        System.out.println("Main thread: " + Thread.currentThread().getName());

        // 使用自定義的線程池處理回調(diào)結(jié)果
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                // event[code: 0, type: CREATE]
                System.out.println("event[code: " + curatorEvent.getResultCode() + ", type: " + curatorEvent.getType() + "]");
                // Thread of processResult: pool-3-thread-1
                System.out.println("Thread of processResult: " + Thread.currentThread().getName());

                semaphore.countDown();
            }
        }, executorService).forPath(path, "init".getBytes());

        // 使用ZooKeeper默認(rèn)的EventThread處理
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                // event[code: -110, type: CREATE]
                System.out.println("event[code: " + curatorEvent.getResultCode() + ", type: " + curatorEvent.getType() + "]");
                // Thread of processResult: main-EventThread
                System.out.println("Thread of processResult: " + Thread.currentThread().getName()); // main-EventThread

                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());

        semaphore.await();
        executorService.shutdown();
    }
}

典型使用場(chǎng)景

要使用Curator典型場(chǎng)景需要引入Maven依賴:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.13.0</version>
</dependency>
事件監(jiān)聽(tīng)

ZooKeeper原生支持通過(guò)注冊(cè)Watcher來(lái)進(jìn)行事件監(jiān)聽(tīng),但是其使用并不是特別方便,需要開(kāi)發(fā)人員自己反復(fù)注冊(cè)Watcher,比較繁瑣。Curator引入了Cache來(lái)實(shí)現(xiàn)對(duì)ZooKeeper服務(wù)端事件的監(jiān)聽(tīng)。Cache是Curator中對(duì)事件監(jiān)聽(tīng)的包裝,其對(duì)事件的監(jiān)聽(tīng)其實(shí)可以近似看作是一個(gè)本地視圖和遠(yuǎn)程ZooKeeper視圖的對(duì)比過(guò)程。同時(shí)Curator能自動(dòng)為開(kāi)發(fā)人員處理反復(fù)事件監(jiān)聽(tīng),從而大大簡(jiǎn)化了原生API開(kāi)發(fā)的繁瑣過(guò)程。Cache分為兩類監(jiān)聽(tīng)類型:節(jié)點(diǎn)監(jiān)聽(tīng)和子節(jié)點(diǎn)監(jiān)聽(tīng)。

節(jié)點(diǎn)監(jiān)聽(tīng)NodeCache示例:

/**
 * 用于監(jiān)聽(tīng)指定ZooKeeper數(shù)據(jù)節(jié)點(diǎn)本身的變化
 */
public class NodeCacheSample {

    private static String path = "/zk-book/nodecache";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args)  throws Exception {
        client.start();
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, "init".getBytes());

        // NodeCache: 用于監(jiān)聽(tīng)指定ZooKeeper數(shù)據(jù)節(jié)點(diǎn)本身的變化
        final NodeCache nodeCache = new NodeCache(client, path, false);

        // true:在第一次啟動(dòng)的時(shí)候就會(huì)從ZooKeepe上讀取對(duì)應(yīng)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容
        nodeCache.start(true);

        // 事件處理的回調(diào)函數(shù)
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("Node data updated, new data: " + new String(nodeCache.getCurrentData().getData()));
            }
        });

        client.setData().forPath(path, "u".getBytes());
        Thread.sleep(1000);
        // 節(jié)點(diǎn)被刪除,無(wú)法觸發(fā)NodeCacheListener
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

子節(jié)點(diǎn)監(jiān)聽(tīng)PathChildrenCache示例:

/**
* 用于監(jiān)聽(tīng)指定ZooKeeper數(shù)據(jù)節(jié)點(diǎn)的子節(jié)點(diǎn)的變化
* Curator無(wú)法監(jiān)聽(tīng)二級(jí)子節(jié)點(diǎn)
*/
public class PathChildrenCacheSample {

   private static String path = "/zk-book";

   private static CuratorFramework client = CuratorFrameworkFactory.builder()
           .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
           .sessionTimeoutMs(5000)
           .connectionTimeoutMs(300)
           .retryPolicy(new ExponentialBackoffRetry(1000, 3))
           .build();

   public static void main(String[] args) throws Exception {
       client.start();

       // 對(duì)/zk-book節(jié)點(diǎn)進(jìn)行子節(jié)點(diǎn)變更事件的監(jiān)聽(tīng)
       PathChildrenCache cache = new PathChildrenCache(client, path, true);

       cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

       cache.getListenable().addListener(new PathChildrenCacheListener() {
           @Override
           public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
               switch (event.getType()) {
                   case CHILD_ADDED:   // 子節(jié)點(diǎn)創(chuàng)建事件
                       System.out.println("CHILD_ADDED: " + event.getData().getPath());
                       break;
                   case CHILD_UPDATED: // 子節(jié)點(diǎn)更新事件
                       System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                       break;
                   case CHILD_REMOVED: // 子節(jié)點(diǎn)刪除節(jié)點(diǎn)
                       System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                       break;
                   default:
                       break;
               }
           }
       });

       client.create().withMode(CreateMode.PERSISTENT).forPath(path);
       Thread.sleep(1000);
       client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
       Thread.sleep(1000);
       client.delete().forPath(path + "/c1");
       Thread.sleep(1000);
       client.delete().forPath(path);
       Thread.sleep(Integer.MAX_VALUE);
   }
}
Master選舉

大體思路:選擇一個(gè)根節(jié)點(diǎn),例如/master_select,多臺(tái)機(jī)器同時(shí)向該節(jié)點(diǎn)創(chuàng)建一個(gè)子節(jié)點(diǎn)/master_select/lock,利用ZooKeeper特性,最終只有一臺(tái)機(jī)器創(chuàng)建成功,成功的那臺(tái)機(jī)器就成為Master。
Curator也是基于這個(gè)思路,只不過(guò)它將節(jié)點(diǎn)創(chuàng)建、事件監(jiān)聽(tīng)和自動(dòng)選舉過(guò)程進(jìn)行了封裝,開(kāi)發(fā)人員只需要調(diào)用簡(jiǎn)單的API即可實(shí)現(xiàn)Master選舉。

public class MasterSelect {

    // Master選舉的根節(jié)點(diǎn),本次Master選舉都是在該節(jié)點(diǎn)下進(jìn)行的
    private static String master_path = "/curator_recipes_master_path";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws InterruptedException {
        client.start();

        LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
            // 獲取Master權(quán)力的時(shí)候回調(diào)該函數(shù)
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("成為Master角色");
                Thread.sleep(3000);
                System.out.println("完成Master操作,釋放Master權(quán)力");
            }
        });

        selector.autoRequeue();
        selector.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}
分布式鎖

一個(gè)典型的時(shí)間戳生成的并發(fā)問(wèn)題:

/**
 * 沒(méi)有使用分布式鎖,生成時(shí)間戳,會(huì)有重復(fù)的生成
 */
public class NoLock {

    public static void main(String[] args) {

        final CountDownLatch down = new CountDownLatch(1);

        // 開(kāi)啟10個(gè)線程生成時(shí)間戳訂單號(hào)
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        down.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的訂單號(hào)是: " + orderNo);
                }
            }).start();
        }

        down.countDown();
    }
}

使用Curator實(shí)現(xiàn)分布式鎖功能

/**
 * 使用Curator實(shí)現(xiàn)分布式鎖功能,生成的時(shí)間戳不會(huì)重復(fù)
 * InterProcessMutex
 * lock.acquire()
 * lock.release()
 */
public class DistributedLock {

    private static String lock_path = "/curator_recipes_lock_path";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, lock_path);    // InterProcessMutex:互斥鎖
        final CountDownLatch down = new CountDownLatch(1);

        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        down.await();
                        lock.acquire(); // 獲取分布式鎖
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的訂單號(hào)是: " + orderNo);
                    try {
                        lock.release(); // 釋放分布式鎖
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        down.countDown();
    }
}
分布式計(jì)數(shù)器
/**
 * 使用Curator分布式計(jì)數(shù)器
 * 典型應(yīng)用場(chǎng)景:統(tǒng)計(jì)系統(tǒng)的在線人數(shù)
 * 思路:指定一個(gè)ZooKeeper數(shù)據(jù)節(jié)點(diǎn)作為計(jì)數(shù)器,多個(gè)應(yīng)用實(shí)例在分布式鎖的控制下,通過(guò)更新該數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容來(lái)實(shí)現(xiàn)計(jì)數(shù)功能
 */
public class DistributedCounter {

    private static String counter_path = "/curator_recipes_counter_path";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, counter_path, new RetryNTimes(3, 1000));

        AtomicValue<Integer> rc = atomicInteger.add(8);
        System.out.println("Result: " + rc.succeeded());
        System.out.println("最新值: " + rc.postValue());
        System.out.println("原始值: " + rc.preValue());
    }
}
分布式Barrier

方式一:主線程觸發(fā)釋放Barrier: DistributedBarrier

/**
 * 使用Curator實(shí)現(xiàn)分布式的CyclicBarrier
 * 主線程觸發(fā)釋放Barrier: DistributedBarrier
 */
public class MainDistributedBarrier {

    private static String barrier_path = "/curator_recipes_barrier_path";

    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        // 初始化分布式barrier
        DistributedBarrier barrier = new DistributedBarrier(client, barrier_path);

        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "號(hào)barrier設(shè)置");

                        // 設(shè)置分布式barrier
                        barrier.setBarrier();
                        // 等待主線程barrier釋放
                        barrier.waitOnBarrier();

                        System.err.println("啟動(dòng)...");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();

        }
        Thread.sleep(2000);
        // 主線程釋放barrier,觸發(fā)所有等待該Barrier的線程同時(shí)進(jìn)行各自的業(yè)務(wù)邏輯
        barrier.removeBarrier();
    }
}

方式二:線程自發(fā)觸發(fā)釋放Barrier: DistributedDoubleBarrier

/**
 * 使用Curator實(shí)現(xiàn)分布式的CyclicBarrier
 * 線程自發(fā)觸發(fā)釋放Barrier: DistributedDoubleBarrier
 * 指定Barrier的成員數(shù)閾值,控制同時(shí)進(jìn)入和退出
 */
public class SelfDistributedBarrier {

    private static String barrier_path = "/curator_recipes_barrier2_path";

    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) {
        client.start();

        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 指定進(jìn)入Barrier的成員數(shù)閾值5
                        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, barrier_path, 5);

                        Thread.sleep(Math.round(Math.random() * 3000));
                        System.out.println(Thread.currentThread().getName() + "號(hào)進(jìn)入barrier");

                        // 處于準(zhǔn)備進(jìn)入狀態(tài),一旦達(dá)到閾值5,同時(shí)觸發(fā)進(jìn)入
                        barrier.enter();
                        System.out.println("啟動(dòng)...");
                        Thread.sleep(Math.round(Math.random() * 3000));

                        // 處于準(zhǔn)備退出狀態(tài),一旦達(dá)到閾值5,同時(shí)觸發(fā)退出
                        barrier.leave();
                        System.out.println("退出...");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}
工具

ZKPaths

/**
 * 提供了一些簡(jiǎn)單的API來(lái)構(gòu)建ZNode路徑、遞歸創(chuàng)建和刪除節(jié)點(diǎn)
 */
public class ZKPathsUtil {

    private static String path = "/zk-book/zkpathsutil";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        ZooKeeper zooKeeper = client.getZookeeperClient().getZooKeeper();

        System.out.println(ZKPaths.fixForNamespace(path, "/sub"));
        System.out.println(ZKPaths.makePath(path, "sub"));
        System.out.println(ZKPaths.getNodeFromPath(path + "/sub1"));

        PathAndNode pn = ZKPaths.getPathAndNode(path + "/sub1");
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zooKeeper, dir1);
        ZKPaths.mkdirs(zooKeeper, dir2);
        System.out.println(ZKPaths.getSortedChildren(zooKeeper, path));

        ZKPaths.deleteChildren(zooKeeper, path, true);
    }
}

EnsurePath
EnsurePath采取了靜默的節(jié)點(diǎn)創(chuàng)建方式,其內(nèi)部實(shí)現(xiàn)就是試圖創(chuàng)建指定節(jié)點(diǎn),如果節(jié)點(diǎn)已經(jīng)存在,那么就不進(jìn)行任何操作,也不對(duì)外拋出異常,否則正常創(chuàng)建節(jié)點(diǎn)。

/**
 * EnsurePath提供了一種能夠確保數(shù)據(jù)節(jié)點(diǎn)存在的機(jī)制
 */
public class EnsurePathUtil {
    private static String path = "/zk-book/EnsurePathUtil";

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(300)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();

        client.usingNamespace("zk-book");

        EnsurePath ensurePath = new EnsurePath(path);
        ensurePath.ensure(client.getZookeeperClient());
        ensurePath.ensure(client.getZookeeperClient());

        EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/EnsurePathUtil");
        ensurePath2.ensure(client.getZookeeperClient());
    }
}

TestServer
為了方便開(kāi)發(fā)人員進(jìn)行ZooKeeper的開(kāi)發(fā)與測(cè)試Curator提供了一種建議啟動(dòng)ZooKeeper服務(wù)和集群的方法—TestingServer、TestingServer。首先需要引入Maven依賴:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.13.0</version>
</dependency>
/**
 * 在啟動(dòng)一個(gè)標(biāo)準(zhǔn)的ZK服務(wù)器,用于單元測(cè)試
 */
public class TestServer {

    private static String path = "/zookeeper";

    public static void main(String[] args) throws Exception {
        // 初始化ZK測(cè)試服務(wù)器
        TestingServer testingServer = new TestingServer(2181, new File("F:\\OneDrive\\study\\zookeeper\\testingserver\\data"));

        // 連接ZK測(cè)試服務(wù)器
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(testingServer.getConnectString())
                .sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        client.start();

        System.out.println(client.getChildren().forPath(path));

        testingServer.close();
    }
}

TestCluster

/**
* 在本地模擬一個(gè)ZK集群,用于單元測(cè)試
*/
public class TestCluster {

   public static void main(String[] args) throws Exception {
       // 搭建一個(gè)三臺(tái)ZK服務(wù)器的集群
       TestingCluster cluster = new TestingCluster(3);
       // 運(yùn)行集群
       cluster.start();

       Thread.sleep(2000);

       TestingZooKeeperServer leader = null;

       for (TestingZooKeeperServer zs : cluster.getServers()) {
           System.out.print(zs.getInstanceSpec().getServerId() + "-");
           System.out.print(zs.getQuorumPeer().getServerState() + "-");
           System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());

           if (zs.getQuorumPeer().getServerState().equals("leading")) {
               leader = zs;
           }
       }
       // Kill掉leader節(jié)點(diǎn),重新進(jìn)行master選舉
       leader.kill();

       Thread.sleep(30000);

       System.out.println("--After leader kill:");
       for (TestingZooKeeperServer zs : cluster.getServers()) {
           System.out.print(zs.getInstanceSpec().getServerId() + "-");
           System.out.print(zs.getQuorumPeer().getServerState() + "-");
           System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
       }
       // 關(guān)閉集群
       cluster.stop();
   }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容