ZooKeeper客戶端Curator使用

? ? ? ?Curator是Netflix公司開源的一套zookeeper客戶端框架.解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。Curator被看做是zookeeper客戶端框里面的瑞士軍刀(牛逼了)。Curator使得我們開發(fā)zookeeper客戶端程序變的很容易。

? ? ? ?Curator框架包含三個(gè)主要的包:

  • curator-framework:對zookeeper的底層api的一些封裝。
  • curator-client:提供一些客戶端的操作,例如重試策略等。
  • curator-recipes:封裝了一些高級特性,例如Cache事件監(jiān)聽、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier等。

? ? ? ?Curator的引入(pom方式,版本可能有變化)。

        <!-- zookeeper -->
        <!-- 對zookeeper的底層api的一些封裝 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <!-- 提供一些客戶端的操作,例如重試策略等 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier等 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
            <!--排除這個(gè)slf4j-log4j12-->
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

一 Curator的基本用法

1.1 創(chuàng)建zookeeper客戶端

? ? ? ?在Curator中CuratorFramework對象就代表一個(gè)zookeeper客戶端。所以創(chuàng)建創(chuàng)建zookeeper客戶端就是創(chuàng)建CuratorFramework對象。CuratorFramework對象又可以通過CuratorFrameworkFactory來創(chuàng)建。

CuratorFramework api介紹如下

public interface CuratorFramework {

    /**
     * 啟動(dòng)zookeeper客戶端
     */
    public void start();

    /**
     * 關(guān)閉zookeeper客戶端
     */
    public void close();

    /**
     * 返回客戶端狀態(tài):LATENT、STARTED、STOPPED
     */
    public CuratorFrameworkState getState();

    /**
     * 客戶端是否已經(jīng)啟動(dòng)
     */
    @Deprecated
    public boolean isStarted();

    /**
     * 創(chuàng)建節(jié)點(diǎn)的建造器
     */
    public CreateBuilder create();

    /**
     * 刪除節(jié)點(diǎn)的建造器
     */
    public DeleteBuilder delete();

    /**
     * 檢查節(jié)點(diǎn)是否存在的建造器
     */
    public ExistsBuilder checkExists();

    /**
     * 獲取接連數(shù)據(jù)的建造器
     */
    public GetDataBuilder getData();

    /**
     * 設(shè)置節(jié)點(diǎn)數(shù)據(jù)的建造器
     */
    public SetDataBuilder setData();

    /**
     * 獲取子節(jié)點(diǎn)的建造器
     */
    public GetChildrenBuilder getChildren();

    /**
     * 獲取權(quán)限的構(gòu)造器
     */
    public GetACLBuilder getACL();

    /**
     * 設(shè)置權(quán)限的構(gòu)造器
     */
    public SetACLBuilder setACL();

    /**
     * 重新配置的建造器
     */
    public ReconfigBuilder reconfig();

    /**
     * 獲取配置的建造器
     */
    public GetConfigBuilder getConfig();

    /**
     * 事務(wù)構(gòu)造器
     * @deprecated use {@link #transaction()} instead
     */
    public CuratorTransaction inTransaction();

    /**
     * 事務(wù)構(gòu)造器
     */
    public CuratorMultiTransaction transaction();

    /**
     * 分配可與{transaction()}一起使用的操作
     */
    public TransactionOp transactionOp();


    /**
     * 如果路徑不存在,則創(chuàng)建路徑對應(yīng)的節(jié)點(diǎn)
     */
    public void createContainers(String path) throws Exception;

    /**
     * 啟動(dòng)同步構(gòu)建器。注意:即使您不使用其中一種background()方法,同步也始終在后臺
     */
    public SyncBuilder sync();

    /**
     * 啟動(dòng)remove watch builder,有節(jié)點(diǎn)刪除的時(shí)候會(huì)調(diào)用
     */
    public RemoveWatchesBuilder watches();

    /**
     *
     * 返回Connect State的可偵聽接口
     */
    public Listenable<ConnectionStateListener> getConnectionStateListenable();

    /**
     * 返回事件的可偵聽接口
     */
    public Listenable<CuratorListener> getCuratorListenable();

    /**
     * 返回未處理錯(cuò)誤的可偵聽接口
     */
    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable();

    /**
     * 返回一個(gè)新的CuratorFramework,該CuratorFramework指定了一個(gè)新的命名空間
     */
    public CuratorFramework usingNamespace(String newNamespace);

    /**
     * 獲取命名空間
     */
    public String getNamespace();

    /**
     * 返回托管的zookeeper客戶端
     */
    public CuratorZookeeperClient getZookeeperClient();

    /**
     * 阻塞,直到與ZooKeeper的連接可用或已超過maxWaitTime
     */
    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;

    /**
     * 阻塞,直到與ZooKeeper的連接可用。在連接可用或中斷之前,此方法不會(huì)返回,在這種情況下,將拋出InterruptedException
     */
    public void blockUntilConnected() throws InterruptedException;

    /**
     * 返回跟蹤觀察者創(chuàng)建的當(dāng)前實(shí)例的外觀,并允許一次性刪除所有觀察者
     */
    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();

    /**
     * 返回配置的錯(cuò)誤策略
     */
    public ConnectionStateErrorPolicy getConnectionStateErrorPolicy();

    /**
     *
     * Current維護(hù)Zookeeper仲裁配置的緩存視圖。
     */
    public QuorumVerifier getCurrentConfig();

    /**
     * 獲取SchemaSet
     */
    SchemaSet getSchemaSet();

    /**
     * 如果此實(shí)例在ZK 3.4.x兼容模式下運(yùn)行,則返回true
     */
    boolean isZk34CompatibilityMode();

}

CuratorFrameworkFactory api介紹如下

public class CuratorFrameworkFactory {

    /**
     * 用于通過建造者模式創(chuàng)建zookeeper客戶端
     */
    public static Builder builder();

    /**
     * 創(chuàng)建zookeeper客戶端
     */
    public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);

    /**
     * 創(chuàng)建zookeeper客戶端
     */
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);

    /**
     * 將本地地址作為可用作節(jié)點(diǎn)有效負(fù)載的字節(jié)返回
     */
    public static byte[] getLocalAddress();

    public static class Builder {

        /**
         * build CuratorFramework對象 -- zookeeper客戶端
         */
        public CuratorFramework build();

        /**
         * 創(chuàng)建一個(gè)臨時(shí)的CuratorFramework客戶端,CuratorFramework,默認(rèn)3分鐘不活動(dòng)客戶端連接就被關(guān)閉
         */
        public CuratorTempFramework buildTemp();

        /**
         * 創(chuàng)建一個(gè)臨時(shí)的CuratorFramework客戶端,CuratorFramework,可以自己指定多長時(shí)間不活動(dòng)客戶端連接就被關(guān)閉
         */
        public CuratorTempFramework buildTemp(long inactiveThreshold, TimeUnit unit);

        /**
         * 添加zookeeper 訪問權(quán)限
         */
        public Builder authorization(String scheme, byte[] auth);

        public Builder authorization(List<AuthInfo> authInfos);

        /**
         * 設(shè)置zookeeper服務(wù)器列表
         */
        public Builder connectString(String connectString);

        /**
         * zookeeper服務(wù)器地址通過EnsembleProvider(配置提供者)來提供,不能和connectString共同使用
         */
        public Builder ensembleProvider(EnsembleProvider ensembleProvider);

        /**
         * 為每次新建的節(jié)點(diǎn)設(shè)置一個(gè)默認(rèn)值
         */
        public Builder defaultData(byte[] defaultData);

        /**
         * 設(shè)置命名空間,為了實(shí)現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,有的時(shí)候需要為每個(gè)業(yè)務(wù)分配一個(gè)獨(dú)立的命名空間
         */
        public Builder namespace(String namespace)

        /**
         * 會(huì)話超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
         */
        public Builder sessionTimeoutMs(int sessionTimeoutMs);

        /**
         * 連接創(chuàng)建超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
         */
        public Builder connectionTimeoutMs(int connectionTimeoutMs);

        /**
         * @param maxCloseWaitMs time to wait during close to join background threads
         * @return this
         */
        public Builder maxCloseWaitMs(int maxCloseWaitMs);

        /**
         * 設(shè)置客戶端重連策略
         */
        public Builder retryPolicy(RetryPolicy retryPolicy);

        /**
         * Executor Services的線程工廠
         */
        public Builder threadFactory(ThreadFactory threadFactory);

        /**
         * 壓縮器,用于壓縮和解壓數(shù)據(jù)
         */
        public Builder compressionProvider(CompressionProvider compressionProvider);

        /**
         * ZookeeperFactory 用于創(chuàng)建ZooKeeper
         */
        public Builder zookeeperFactory(ZookeeperFactory zookeeperFactory);

        /**
         * 權(quán)限控制器
         */
        public Builder aclProvider(ACLProvider aclProvider);

        /**
         * 設(shè)置只讀模式
         */
        public Builder canBeReadOnly(boolean canBeReadOnly);

        /**
         * 不讓客戶端,創(chuàng)建節(jié)點(diǎn)的時(shí)候順帶創(chuàng)建父節(jié)點(diǎn)
         */
        public Builder dontUseContainerParents();

        /**
         * 默認(rèn)是StandardConnectionStateErrorPolicy,設(shè)置要使用的錯(cuò)誤策略
         */
        public Builder connectionStateErrorPolicy(ConnectionStateErrorPolicy connectionStateErrorPolicy);

        /**
         * 如果mode為true,則創(chuàng)建ZooKeeper 3.4.x兼容客戶端。如果使用的客戶端庫是ZooKeeper 3.4.x 默認(rèn)情況下已啟用
         */
        public Builder zk34CompatibilityMode(boolean mode);

        /**
         * 更改連接處理策略,默認(rèn)StandardConnectionHandlingPolicy
         */
        public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy);

        /**
         * 添加強(qiáng)制架構(gòu)集
         */
        public Builder schemaSet(SchemaSet schemaSet);
    }
}

? ? ? ?從上面的CuratorFrameworkFactory api的介紹可以看出CuratorFrameworkFactory對象的創(chuàng)建有兩種方式:

  • 通過過構(gòu)造函數(shù)創(chuàng)建
參數(shù) 類型 含義
connectString String 服務(wù)器列表,格式host1:port1,host2:port2,…
sessionTimeoutMs int 會(huì)話超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
connectionTimeoutMs int 連接創(chuàng)建超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
retryPolicy RetryPolicy 重試策略,curator已經(jīng)提供了多種重試策略,也可以自行實(shí)現(xiàn)RetryPolicy接口

curator提供的重試策略有:ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed

  • 通過build創(chuàng)建,關(guān)于build里面的各個(gè)參數(shù)在CuratorFrameworkFactory api里面都順帶介紹了哦。

? ? ? ?比如如下的實(shí)例代碼,連接到127.0.0.1:2181服務(wù)端。

關(guān)于zookeeper的安裝大家可以自己去網(wǎng)上搜下。

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(6000)
                .build();

1.2 啟動(dòng)客戶端

? ? ? ?調(diào)用start()方法啟動(dòng)客戶端。這個(gè)時(shí)候zookeeper客戶端才會(huì)去連接zookeeper服務(wù)端。在zookeeper客戶端上做的所有動(dòng)作都需要在start()之后執(zhí)行。如果你不想連接服務(wù)端的時(shí)候可以調(diào)用close()方法斷開連接.

1.3 節(jié)點(diǎn)操作

? ? ? ?首先我們要明確zookeeper里面的節(jié)點(diǎn)結(jié)構(gòu)類似于我們文件系統(tǒng)的結(jié)構(gòu)(就像一棵樹樣的)。除此之外zookeeper的每個(gè)節(jié)點(diǎn)上還可以保存數(shù)據(jù)。zookeeper里面的節(jié)點(diǎn)有四種,不同的節(jié)點(diǎn)類型都有自己的特點(diǎn):

  • CreateMode.PERSISTENT:持久化節(jié)點(diǎn)。
  • CreateMode.PERSISTENT_SEQUENTIAL:持久化并且?guī)蛄刑柟?jié)點(diǎn)。
  • CreateMode.EPHEMERAL:臨時(shí)節(jié)點(diǎn)(客戶端斷開了節(jié)點(diǎn)也就刪除了)
  • CreateMode.EPHEMERAL_SEQUENTIAL:臨時(shí)并且?guī)蛄刑?客戶端斷開了節(jié)點(diǎn)也就刪除了)

1.3.1 創(chuàng)建節(jié)點(diǎn)

? ? ? ?創(chuàng)建節(jié)點(diǎn)很簡單,我們前面已經(jīng)創(chuàng)建了zookeeper客戶端,并且調(diào)了start()方法把客戶端啟動(dòng)起來了。

? ? ? ?比如我們可以用如下的代碼創(chuàng)建一個(gè)持久化的節(jié)點(diǎn)。通過withMode(CreateMode.PERSISTENT)來指定節(jié)點(diǎn)的類型。

    /**
     * 同步 創(chuàng)建持久化節(jié)點(diǎn)
     *
     * @param path 節(jié)點(diǎn)路徑
     * @throws Exception errors
     */
    public void createPersistentNodeSync(String path) throws Exception {
        client.create()
                .creatingParentContainersIfNeeded() // 自動(dòng)遞歸創(chuàng)建父節(jié)點(diǎn)
                .withMode(CreateMode.PERSISTENT)
                .forPath(path);
    }

? ? ? ?我們也可以在創(chuàng)建節(jié)點(diǎn)的同時(shí),給節(jié)點(diǎn)設(shè)置數(shù)據(jù)。


    /**
     * 同步-創(chuàng)建持久化節(jié)點(diǎn)
     *
     * @param path 節(jié)點(diǎn)路徑
     * @param data 節(jié)點(diǎn)對應(yīng)的值
     * @throws Exception errors
     */
    public void createPersistentNodeSync(String path, byte[] data) throws Exception {
        client.create()
                .creatingParentContainersIfNeeded() // 自動(dòng)遞歸創(chuàng)建父節(jié)點(diǎn)
                .withMode(CreateMode.PERSISTENT)
                .forPath(path, data);

    }

1.3.2 刪除節(jié)點(diǎn)

? ? ? ?刪除葉子節(jié)點(diǎn)。(如果不是葉子節(jié)點(diǎn)是會(huì)報(bào)異常的)

    /**
     * 同步-刪除一個(gè)葉子節(jié)點(diǎn)(注意哦,只能刪除葉子節(jié)點(diǎn)否則報(bào)錯(cuò)的)
     *
     * @param path 需要?jiǎng)h除的節(jié)點(diǎn)對應(yīng)的路徑
     * @throws Exception errors
     */
    public void deleteNodeSync(String path) throws Exception {
        client.delete()
                .forPath(path);
    }

? ? ? ?我們也可以刪除整個(gè)節(jié)點(diǎn)(包括節(jié)點(diǎn)下的子節(jié)點(diǎn))。

    /**
     * 同步-刪除一個(gè)節(jié)點(diǎn),并且遞歸刪除其所有的子節(jié)點(diǎn)
     *
     * @param path 需要?jiǎng)h除的節(jié)點(diǎn)對應(yīng)的路基
     * @throws Exception errors
     */
    public void deleteNodeRecursivelySync(String path) throws Exception {
        client.delete()
                .deletingChildrenIfNeeded()
                .forPath(path);

    }

1.3.3 判斷節(jié)點(diǎn)是否存在

? ? ? ?通過節(jié)點(diǎn)的Stat來判斷節(jié)點(diǎn)是否存在。

    /**
     * 同步-檢查節(jié)點(diǎn)是否存在
     *
     * @param path 節(jié)點(diǎn)路徑
     * @return 節(jié)點(diǎn)是否存在
     * @throws Exception errors
     */
    public boolean isNodeExistSync(String path) throws Exception {
        Stat state = client.checkExists()
                .forPath(path);
        return state != null;
    }

1.3.4 節(jié)點(diǎn)數(shù)據(jù)操作

? ? ? ? 讀取節(jié)點(diǎn)數(shù)據(jù)。

    /**
     * 同步-讀取一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容
     *
     * @param path 節(jié)點(diǎn)路基
     * @return 節(jié)點(diǎn)內(nèi)容
     * @throws Exception errors
     */
    public byte[] getNodeDataSync(String path) throws Exception {
        return client.getData()
                .forPath(path);
    }

? ? ? ? 更新節(jié)點(diǎn)數(shù)據(jù),或者設(shè)置null刪除節(jié)點(diǎn)數(shù)據(jù)

    /**
     * 同步-更新一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容
     *
     * @param path 節(jié)點(diǎn)路徑
     * @param data 節(jié)點(diǎn)對應(yīng)數(shù)據(jù)
     * @throws Exception errors
     */
    public void updateNodeDataSync(String path, byte[] data) throws Exception {
        client.setData()
                .forPath(path, data);
    }

1.3.5 獲取節(jié)點(diǎn)的所有子節(jié)點(diǎn)

    /**
     * 同步-獲取某個(gè)節(jié)點(diǎn)的所有子節(jié)點(diǎn)路徑
     *
     * @param path 目錄
     * @return children
     * @throws Exception errors
     */
    public List<String> getChildrenSync(String path) throws Exception {
        return client.getChildren()
                .forPath(path);
    }

1.4 事務(wù)

? ? ? ?事務(wù)相信大家都非常的熟悉。Curator也提供了事務(wù)的支持,一組crud操作要么都成功,要么都失敗。使用起來也非常的簡單。

? ? ? ?一個(gè)事務(wù)里面肯定是有多個(gè)操作的,我們首先要把每個(gè)操作都封裝成CuratorOp。比如如下的實(shí)例,我們把多個(gè)操作放到一個(gè)事務(wù)里面去執(zhí)行.

    @Test
    public void transaction() throws Exception {
        CuratorOp createOp = client.transactionOp().create().forPath("/a/path", "some data".getBytes());
        CuratorOp setDataOp = client.transactionOp().setData().forPath("/another/path", "other data".getBytes());
        CuratorOp deleteOp = client.transactionOp().delete().forPath("/yet/another/path");

        Collection<CuratorTransactionResult> results = client.transaction().forOperations(createOp, setDataOp, deleteOp);
        for (CuratorTransactionResult result : results) {
            System.out.println(result.getForPath() + " - " + result.getType());
        }
    }

1.5 異步操作

? ? ? ?因?yàn)閦ookeeper客戶端的操作都是在和zookeeper服務(wù)端打交道的。涉及到網(wǎng)絡(luò)的調(diào)用。所以有些操作的響應(yīng)就不會(huì)那么及時(shí)了。Curator就給提供了異步操作。異步響應(yīng)操作結(jié)果。

? ? ? ?既然是異步操作,那么肯定需要BackgroundCallback來異步接收操作結(jié)果了。關(guān)于異步操作,我們也舉一個(gè)簡單的例子,我們以創(chuàng)建節(jié)點(diǎn)來舉例(刪除節(jié)點(diǎn),修改節(jié)點(diǎn)數(shù)據(jù),事務(wù)等等其他操作都是一樣的使用)。

    /**
     * 異步-獲取某個(gè)節(jié)點(diǎn)的所有子節(jié)點(diǎn)路徑
     *
     * @param path     目錄
     * @param callback 回調(diào)
     * @throws Exception errors
     */
    public void getChildrenAsync(String path, BackgroundCallback callback) throws Exception {
        client.getChildren()
                .inBackground(callback)
                .forPath(path);
    }

    /**
     * 異步-獲取某個(gè)節(jié)點(diǎn)的所有子節(jié)點(diǎn)路徑
     *
     * @param path     目錄
     * @param callback 回調(diào)
     * @param executor 回調(diào)在哪里執(zhí)行
     * @throws Exception errors
     */
    public void getChildrenAsync(String path, BackgroundCallback callback, Executor executor) throws Exception {
        client.getChildren()
                .inBackground(callback, executor)
                .forPath(path);

    }

二 Curator高級特性

? ? ? ?Curator里面的curator-recipes ja包封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier等等。而且這些特性都是在分布式系統(tǒng)里面常用的功能了。

2.1 Cache事件監(jiān)聽

? ? ? ?Zookeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是開發(fā)者需要反復(fù)注冊(Watcher只能單次注冊單次使用)。Cache是Curator中對事件監(jiān)聽的包裝,可以看作是對事件監(jiān)聽的本地緩存視圖,能夠自動(dòng)為開發(fā)者處理反復(fù)注冊監(jiān)聽。Curator提供了三種Watcher(Cache)來監(jiān)聽結(jié)點(diǎn)的變化。

2.1.1 Path Cache

? ? ? ?Path Cache用來監(jiān)控子節(jié)點(diǎn).當(dāng)一個(gè)子節(jié)點(diǎn)增加, 更新,刪除時(shí), Path Cache會(huì)改變它的狀態(tài),會(huì)包含最新子節(jié)點(diǎn)的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過PathChildrenCacheListener通知。

? ? ? ?Path Cache的使用非常的簡單,主要涉及到四個(gè)類:

  • PathChildrenCache:Path Cache聽實(shí)現(xiàn)類
  • PathChildrenCacheEvent:子節(jié)點(diǎn)事件
  • PathChildrenCacheListener: 子節(jié)點(diǎn)監(jiān)聽
  • ChildData:子節(jié)點(diǎn)信息

? ? ? ?關(guān)于Path Cache的使用,我們用一個(gè)實(shí)例來簡單的說明下,實(shí)例里面也只是簡單的創(chuàng)建了一個(gè)節(jié)點(diǎn)。最終監(jiān)聽到節(jié)點(diǎn)的創(chuàng)建.

    @Test
    public void pathChildrenCache() throws Exception {

        // 創(chuàng)建zookeeper客戶端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(6000)
                .build();
        // 啟動(dòng)客戶端
        client.start();

        PathChildrenCache cache = new PathChildrenCache(client, "/tuacy/pathCache", true);
        // 添加監(jiān)聽
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("事件類型:" + event.getType());
                if (null != event.getData()) {
                    System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));
                }
            }
        });
        cache.start();
        // 添加節(jié)點(diǎn)
        client.create().creatingParentContainersIfNeeded().forPath("/tuacy/pathCache/001");
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        cache.close();
    }

2.1.2 Node Cache

? ? ? ?Node Cache與Path Cache類似,Node Cache只是監(jiān)聽某一指定的節(jié)點(diǎn)。子節(jié)點(diǎn)的變化它是不會(huì)管的。

? ? ? ?Node Cache的使用涉及到下面的三個(gè)類:

  • NodeCache - Node Cache實(shí)現(xiàn)類
  • NodeCacheListener - 節(jié)點(diǎn)監(jiān)聽器
  • ChildData - 節(jié)點(diǎn)數(shù)據(jù)

? ? ? ?我們還是用一個(gè)簡單的實(shí)例來說明。

    @Test
    public void nodeCache() throws Exception {

        // 創(chuàng)建zookeeper客戶端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(6000)
                .build();
        // 啟動(dòng)客戶端
        client.start();

        final NodeCache cache = new NodeCache(client, "/tuacy/nodeCache");
        cache.start();
        // 添加監(jiān)聽
        cache.getListenable().addListener(new NodeCacheListener() {

            @Override
            public void nodeChanged() throws Exception {
                ChildData data = cache.getCurrentData();
                if (null != data) {
                    System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + new String(cache.getCurrentData().getData()));
                } else {
                    System.out.println("節(jié)點(diǎn)被刪除!");
                }
            }
        });
        // 添加節(jié)點(diǎn)
        client.create().creatingParentsIfNeeded().forPath("/tuacy/nodeCache");
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        client.setData().forPath("/tuacy/nodeCache", "abc".getBytes());
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        client.delete().forPath("/tuacy/nodeCache");
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        cache.close();
    }

2.1.3 Tree Cache

? ? ? ?Tree Cache可以監(jiān)控整個(gè)樹上的所有節(jié)點(diǎn),就是PathCache和NodeCache的組合功能。

? ? ? ?Tree Cache的使用涉及到下面四個(gè)類。

  • TreeCache - Tree Cache實(shí)現(xiàn)類
  • TreeCacheListener - 監(jiān)聽器類
  • TreeCacheEvent - 觸發(fā)的事件類
  • ChildData - 節(jié)點(diǎn)數(shù)據(jù)

? ? ? ?我們還是以具體的實(shí)例來說明Tree Cache的使用。

    @Test
    public void nodeCache() throws Exception {

        // 創(chuàng)建zookeeper客戶端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(6000)
                .build();
        // 啟動(dòng)客戶端
        client.start();
        final TreeCache cache = TreeCache.newBuilder(client, "/tuacy/treeCache")
                .setCacheData(true)
                .build();
        // 添加監(jiān)聽
        cache.getListenable().addListener(new TreeCacheListener() {

            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("事件類型:" + event.getType() + " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));
            }
        });
        cache.start();
        // 添加節(jié)點(diǎn)
        client.create().creatingParentsIfNeeded().forPath("/tuacy/treeCache");
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 給節(jié)點(diǎn)設(shè)置數(shù)據(jù)
        client.setData().forPath("/tuacy/treeCache", "abc".getBytes());
        // 創(chuàng)建子節(jié)點(diǎn)
        client.create().creatingParentsIfNeeded().forPath("/tuacy/treeCache/001");
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 修改子節(jié)點(diǎn)的數(shù)據(jù)
        client.setData().forPath("/tuacy/treeCache/001", "abc".getBytes());
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 刪除子節(jié)點(diǎn)
        client.delete().forPath("/tuacy/treeCache/001");
        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
        // 刪除節(jié)點(diǎn)
        client.delete().forPath("/tuacy/treeCache");
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        cache.close();
        client.close();
    }

2.2 Leader選舉

? ? ? ?在分布式系統(tǒng)中,選主是一個(gè)很常見的場景(Leader,Slaver真的真的是非常的常見)。

  • 主節(jié)點(diǎn)是唯一的。
  • 各個(gè)節(jié)點(diǎn)獲取主節(jié)點(diǎn)的概率是一樣的,一旦某個(gè)節(jié)點(diǎn)被選為了主節(jié)點(diǎn)(Leader),其他的從節(jié)點(diǎn)(Slaver)也要能感知到。
  • 一旦主節(jié)點(diǎn)斷開,其他的從節(jié)點(diǎn)重新選出一個(gè)主節(jié)點(diǎn)。

2.2.1 LeaderLatch

? ? ? ?在不同的zookeeper客戶端,使用了相同latch path的LeaderLatch,當(dāng)中的一個(gè)最終會(huì)被選舉為leader,可以通過hasLeadership方法查看LeaderLatch實(shí)例是否leade。也可以在LeaderLatchListener里面監(jiān)聽當(dāng)前節(jié)點(diǎn)是否是leader。使用LeaderLatch的時(shí)候如果不想?yún)⑴c選舉了要調(diào)用close()方法退出選舉。

LeaderLatch api介紹

public class LeaderLatch {

    /**
     * 構(gòu)造函數(shù)
     *
     * @param client    CuratorFramework
     * @param latchPath 路徑,所有參與者同一個(gè)路徑
     */
    public LeaderLatch(CuratorFramework client, String latchPath);
    public LeaderLatch(CuratorFramework client, String latchPath, String id);
    public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);

    /**
     * 參與選舉
     */
    public void start() throws Exception;

    /**
     * 退出選舉
     */
    @Override
    public void close() throws IOException;

    /**
     * 退出選舉
     * 關(guān)閉方式:SILENT : 靜默關(guān)閉,不觸發(fā)相關(guān)監(jiān)聽器、NOTIFY_LEADER :關(guān)閉時(shí)觸發(fā)監(jiān)聽器
     */
    public synchronized void close(CloseMode closeMode) throws IOException;


    /**
     * 添加監(jiān)聽器,監(jiān)聽是否當(dāng)選為leader
     */
    public void addListener(LeaderLatchListener listener);
    public void addListener(LeaderLatchListener listener, Executor executor);

    /**
     * 移除監(jiān)聽器
     */
    public void removeListener(LeaderLatchListener listener);

    /**
     * 嘗試讓當(dāng)前LeaderLatch實(shí)例為leader
     */
    public void await() throws InterruptedException, EOFException
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 獲取構(gòu)造函數(shù)里面這是的id
     */
    public String getId();

    /**
     * 獲取當(dāng)前LeaderLatch實(shí)例的狀態(tài)
     */
    public State getState();

    /**
     * 返回所有的參與者
     */
    public Collection<Participant> getParticipants() throws Exception;

    /**
     * 返回當(dāng)前l(fā)eader節(jié)點(diǎn)信息
     */
    public Participant getLeader() throws Exception;

    /**
     * 判斷實(shí)例是否是leader
     */
    public boolean hasLeadership();

}

? ? ? ?我們用一個(gè)簡單的實(shí)例來說明LeaderLatch用法,比如我們創(chuàng)建10個(gè)zookeeper客戶端來進(jìn)行選舉。


    @Test
    public void leaderLatch() throws Exception {

        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
        List<LeaderLatch> leaderLatchList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        // 這里我們所有的客戶端都參與leader選舉
        for (int index = 0; index < zookeeperClientList.size(); index++) {
            // 所有的客戶端都參與leader選舉
            final LeaderLatch latch = new LeaderLatch(zookeeperClientList.get(index), LEADER_PATH, index + "");
            latch.addListener(new LeaderLatchListener() {
                @Override
                public void isLeader() {
                    System.out.println("我是leader: " + latch.getId());
                }

                @Override
                public void notLeader() {
                    System.out.println("我不是leader: " + latch.getId());
                }
            });
            latch.start();
            leaderLatchList.add(latch);
        }

        // 30S之后
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        // 我們找到誰是leader
        String leaderId = leaderLatchList.get(0).getLeader().getId();
        System.out.println("當(dāng)前l(fā)eader id : " + leaderId);
        leaderLatchList.forEach(item -> {
            // 這里我們吧leader退出選舉,讓剩下的重新選舉
            if (item.getId().equals(leaderId)) {
                try {
                    item.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });


        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
        leaderLatchList.forEach(curatorFramework -> {
            // 退出選舉
            try {
                curatorFramework.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        zookeeperClientList.forEach(curatorFramework -> {
            // 關(guān)閉客戶端
            curatorFramework.close();
        });


    }

2.2.2 LeaderSelector

? ? ? ?LeaderSelector也是一個(gè)用于分布式選舉的類,相對于LeaderLatch來說,LeaderSelector更加的靈活點(diǎn)。LeaderSelector使用的時(shí)候主要涉及下面幾個(gè)類:

  • LeaderSelector:LeaderSelector選舉實(shí)例類。
  • LeaderSelectorListener:監(jiān)聽選舉狀態(tài)和連接狀態(tài)
  • LeaderSelectorListenerAdapter:實(shí)現(xiàn)了LeaderSelectorListener接口的一個(gè)抽象類,封裝了客戶端與zk服務(wù)器連接掛起或者斷開時(shí)的處理邏輯(拋出搶主失敗CancelLeadershipException),一般監(jiān)聽器推薦實(shí)現(xiàn)該類。
  • CancelLeadershipException: 搶主失敗異常。

LeaderSelector api 介紹

public class LeaderSelector {

    /**
     * 構(gòu)造函數(shù)
     * @param client     CuratorFramework
     * @param leaderPath 路徑
     * @param listener   監(jiān)聽器
     */
    public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);


    /**
     * 保證在此實(shí)例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)
     */
    public void autoRequeue();

    /**
     * 設(shè)置獲取當(dāng)前實(shí)例對應(yīng)的id
     */
    public void setId(String id);
    public String getId();

    /**
     * 當(dāng)前實(shí)例參與選舉
     */
    public void start();

    /**
     * 重新鍵入到參與者隊(duì)列里面去選舉,如果此實(shí)例已在參與者排隊(duì)里面,則不會(huì)發(fā)生任何操作并返回false。如果實(shí)例未排隊(duì),則重新執(zhí)行該操作并返回true
     */
    public boolean requeue();

    /**
     * 退出選舉
     */
    public synchronized void close();

    /**
     * 獲取所有的參與者
     */
    public Collection<Participant> getParticipants() throws Exception;

    /**
     * 獲取leader
     */
    public Participant getLeader() throws Exception;

    /**
     * 當(dāng)前節(jié)點(diǎn)是否是leader
     */
    public boolean hasLeadership();

    /**
     * 如果當(dāng)前實(shí)例是leader的話,嘗試終斷領(lǐng)導(dǎo)權(quán)
     */
    public synchronized void interruptLeadership();


}

ConnectionStateListener、LeaderSelectorListener

public interface ConnectionStateListener {
    /**
     * 監(jiān)聽網(wǎng)絡(luò)連接問題
     */
    public void stateChanged(CuratorFramework client, ConnectionState newState);
}

/**
 * Notification for leadership
 *
 * @see LeaderSelector
 */
public interface LeaderSelectorListener extends ConnectionStateListener {
    /**
     * 當(dāng)前節(jié)點(diǎn)獲取到leader權(quán)之后調(diào)用,注意:在您希望釋放領(lǐng)導(dǎo)力之前,此方法不應(yīng)返回
     * 所以說如果你想一直占有l(wèi)eader權(quán)利,就在里面寫個(gè)無限循環(huán)吧
     */
    public void         takeLeadership(CuratorFramework client) throws Exception;
}

LeaderSelectorListenerAdapter

/**
 * 實(shí)現(xiàn)了LeaderSelectorListener接口的一個(gè)抽象類,封裝了客戶端與zk服務(wù)器連接掛起或者斷開時(shí)的處理邏輯(拋出搶主失敗CancelLeadershipException),一般監(jiān)聽器推薦實(shí)現(xiàn)該類
 */
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
    /**
     * 當(dāng)遇到SUSPENDED以及LOST時(shí)直接拋出CancelLeadershipException從而去引發(fā)LeaderSelector.interruptLeadership()調(diào)用
     */
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ) {
            throw new CancelLeadershipException();
        }
    }
}

? ? ? ?我們還是用一個(gè)簡單的實(shí)例來說明LeaderSelector的用法,我們還是創(chuàng)建10個(gè)zookeeper客戶端。并且我們創(chuàng)建一個(gè)LeaderSelectorAdapter類,在里面當(dāng)是leader之后的一些處理,如果是leader 10s之后,釋放leader權(quán)力重新選舉。

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter {

    private final LeaderSelector leaderSelector;

    public LeaderSelectorAdapter(CuratorFramework client, String path, String id) {
        // 創(chuàng)建一個(gè)LeaderSelector對象
        leaderSelector = new LeaderSelector(client, path, this);
        // 設(shè)置id
        leaderSelector.setId(id);
        // 保證在此實(shí)例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)
        leaderSelector.autoRequeue();
    }

    /**
     * 參與選舉
     */
    public void start() {
        // 參與選舉
        leaderSelector.start();
    }

    /**
     * 退出選舉
     */
    public void close() {
        // 退出選舉
        leaderSelector.close();
    }

    /**
     * 當(dāng)獲得leader的時(shí)候,這個(gè)方法會(huì)被調(diào)用。如果還想繼續(xù)當(dāng)leader,這個(gè)方法不能返回。如果你想要要此實(shí)例一直是leader的話可以加一個(gè)死循環(huán)
     */
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        System.out.println(leaderSelector.getId() + " 是leader");
        try {
            // 當(dāng)上leader 5s之后,釋放leader權(quán)利
            Thread.sleep(TimeUnit.SECONDS.toMillis(10));
        } catch (InterruptedException e) {
            System.err.println(leaderSelector.getId() + " 被中斷.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(leaderSelector.getId() + " 釋放leader的權(quán)力。");
        }
    }
}
    private static final String LEADER_PATH = "/tuacy/leaderSelector";

    @Test
    public void leaderSelector() throws Exception {
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();
        List<LeaderSelectorAdapter> leaderLatchList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        // 這里我們所有的客戶端都參與leader選舉
        for (int index = 0; index < zookeeperClientList.size(); index++) {
            // 所有的客戶端都參與leader選舉
            final LeaderSelectorAdapter latch = new LeaderSelectorAdapter(zookeeperClientList.get(index), LEADER_PATH, index + "");
            latch.start();
            leaderLatchList.add(latch);
        }

        // 1分鐘之后關(guān)掉程序
        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
        leaderLatchList.forEach(curatorFramework -> {
            // 退出選舉
            curatorFramework.close();

        });
        zookeeperClientList.forEach(curatorFramework -> {
            // 關(guān)閉客戶端
            curatorFramework.close();
        });
    }

2.3 分布式鎖

? ? ? ?分布式鎖也是咱們分布式系統(tǒng)里面非常常見的功能了。Curator直接就幫我們做到了,省的我們自己去實(shí)現(xiàn)分布式鎖。

2.3.1 InterProcessMutex

? ? ? ?InterProcessMutex公平鎖、可重入鎖。和ReentrantLock類似。

InterProcessMutex api 介紹

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {

    /**
     * InterProcessMutex的構(gòu)造函數(shù),
     */
    public InterProcessMutex(CuratorFramework client, String path);
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver);

    /**
     * 申請獲取鎖
     */
    @Override
    public void acquire() throws Exception;
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception;

    /**
     *
     * 如果此JVM中的線程獲取了互斥鎖,則返回true
     */
    @Override
    public boolean isAcquiredInThisProcess();

    /**
     * 釋放鎖
     */
    @Override
    public void release() throws Exception;

    /**
     * 返回所有參與獲取鎖的所有當(dāng)前節(jié)點(diǎn)的排序列表
     */
    public Collection<String> getParticipantNodes() throws Exception;

    /**
     * 將鎖設(shè)為可撤銷的. 當(dāng)別的進(jìn)程或線程想讓你釋放鎖是Listener會(huì)被調(diào)用
     */
    @Override
    public void makeRevocable(RevocationListener<InterProcessMutex> listener);
    @Override
    public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor);

    /**
     * 如果調(diào)用線程獲取互斥鎖,則返回true
     */
    public boolean isOwnedByCurrentThread();

}

2.3.2 InterProcessSemaphoreMutex

? ? ? ?InterProcessSemaphoreMutex不可重入鎖。

InterProcessSemaphoreMutex api介紹

public class InterProcessSemaphoreMutex implements InterProcessLock {

    /**
     * 構(gòu)造函數(shù)
     */
    public InterProcessSemaphoreMutex(CuratorFramework client, String path);

    /**
     * 申請獲取鎖
     */
    @Override
    public void acquire() throws Exception;

    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception;

    /**
     * 釋放鎖
     */
    @Override
    public void release() throws Exception;

    /**
     * 如果此JVM中的線程獲取了互斥鎖,則返回true
     */
    @Override
    public boolean isAcquiredInThisProcess();
}

2.3.3 InterProcessReadWriteLock

? ? ? ?InterProcessReadWriteLock 讀寫鎖。和ReadWriteLock類似。

InterProcessReadWriteLock api 介紹

public class InterProcessReadWriteLock {

    /**
     * 讀鎖
     */
    private final InterProcessMutex readMutex;
    /**
     * 寫鎖
     */
    private final InterProcessMutex writeMutex;
    
    /**
     * 構(gòu)造函數(shù)
     */
    public InterProcessReadWriteLock(CuratorFramework client, String basePath)

    /**
     * 構(gòu)造函數(shù)
     * lockData是存儲(chǔ)在節(jié)點(diǎn)上的數(shù)據(jù)
     */
    public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData);

    /**
     * 獲取讀鎖
     */
    public InterProcessMutex     readLock();

    /**
     * 獲取寫鎖
     */
    public InterProcessMutex     writeLock();

}

2.3.4 信號量(InterProcessSemaphoreV2)

? ? ? ?InterProcessSemaphoreV2實(shí)現(xiàn)了一個(gè)跨jvm的信號量,主要工作原理是:acquire時(shí)創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn),如果創(chuàng)建成功且臨時(shí)節(jié)點(diǎn)數(shù)小于等于maxLeases則說明信號量獲取成功,否則wait等待,等待目錄發(fā)生變化或計(jì)數(shù)改變時(shí)喚醒。和Semaphore的功能類似.

? ? ? ?分布式信號量的使用。我們需要了解以下三個(gè)類。

  • InterProcessSemaphoreV2:信號量實(shí)現(xiàn)類
  • Lease:租約(單個(gè)信號)
  • SharedCountReader:計(jì)數(shù)器,用于計(jì)算最大租約數(shù)量

InterProcessSemaphoreV2 api 介紹

public class InterProcessSemaphoreV2 {

    /**
     * 構(gòu)造函數(shù)
     * @param client    CuratorFramework
     * @param path      節(jié)點(diǎn)路徑
     * @param maxLeases 允許此實(shí)例的最大租約數(shù)
     */
    public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases);

    /**
     * 構(gòu)造函數(shù)
     * @param client CuratorFramework
     * @param path   節(jié)點(diǎn)路徑
     * @param count  用于最大租約的共享計(jì)數(shù)
     */
    public InterProcessSemaphoreV2(CuratorFramework client, String path, SharedCountReader count);

    /**
     * 此信號量創(chuàng)建的節(jié)點(diǎn)放置的數(shù)據(jù),必須在調(diào)用其中一個(gè)acquire()方法之前調(diào)用它
     */
    public void setNodeData(byte[] nodeData);

    /**
     * 返回參與信號量的所有當(dāng)前節(jié)點(diǎn)的列表
     */
    public Collection<String> getParticipantNodes() throws Exception;

    /**
     * 關(guān)閉給定租約集合中的所有租約
     */
    public void returnAll(Collection<Lease> leases);

    /**
     * 關(guān)閉租約
     */
    public void returnLease(Lease lease);

    /**
     * 獲取租約,如果沒有租約獲取會(huì)一直阻塞直到獲取到租約
     */
    public Lease acquire() throws Exception;
    public Lease acquire(long time, TimeUnit unit) throws Exception

    /**
     * 獲取指定數(shù)量的租約,如果沒有獲取到制定數(shù)量租約會(huì)一直阻塞
     */
    public Collection<Lease> acquire(int qty) throws Exception;
    public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception;
    
}

2.3.5 InterProcessMultiLock(多共享鎖對象

? ? ? ?它可以把多個(gè)鎖包含起來像一個(gè)鎖一樣進(jìn)行操作,簡單來說就是對多個(gè)鎖進(jìn)行一組操作。當(dāng)acquire的時(shí)候就獲得多個(gè)鎖資源,否則失敗。同樣調(diào)用release時(shí)所有的鎖都被release(失敗被忽略)。

InterProcessMultiLock api 介紹

public class InterProcessMultiLock implements InterProcessLock {

    /**
     * 構(gòu)造函數(shù)
     *
     * @param client CuratorFramework
     * @param paths 節(jié)點(diǎn)列表對應(yīng)的路徑(多個(gè)路徑就是多個(gè)鎖)
     */
    public InterProcessMultiLock(CuratorFramework client, List<String> paths);

    /**
     * 構(gòu)造函數(shù)
     */
    public InterProcessMultiLock(List<InterProcessLock> locks);

    /**
     * 請求鎖
     */
    @Override
    public void acquire() throws Exception;
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception;

    /**
     * 釋放鎖
     */
    @Override
    public synchronized void release() throws Exception;

    /**
     * 如果此JVM中的線程獲取了所有的鎖,則返回true
     */
    @Override
    public synchronized boolean isAcquiredInThisProcess();
}

2.4 分布式計(jì)數(shù)器

? ? ? ?計(jì)數(shù)器是用來計(jì)數(shù)的,利用ZooKeeper可以實(shí)現(xiàn)一個(gè)分布式計(jì)數(shù)器。只要使用相同的path就可以得到最新的計(jì)數(shù)器值,這是由ZooKeeper的一致性保證的。Curator有兩個(gè)計(jì)數(shù)器,一個(gè)是用int來計(jì)數(shù)(SharedCount),一個(gè)用long來計(jì)數(shù)(DistributedAtomicLong)。

2.4.1 SharedCount(int計(jì)數(shù)器)

? ? ? ?SharedCount使用int類型來計(jì)數(shù)。相當(dāng)于多個(gè)zookeeper客戶端公用一個(gè)計(jì)算器。

  • SharedCount:計(jì)數(shù)器的具體實(shí)現(xiàn)。
  • SharedCountListener:監(jiān)聽數(shù)據(jù)的改變。

SharedCount api 介紹

public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener> {


    /**
     * 構(gòu)造函數(shù)
     * @param client CuratorFramework
     * @param path 計(jì)數(shù)器依賴的節(jié)點(diǎn)
     * @param seedValue 如果當(dāng)前節(jié)點(diǎn)對應(yīng)的計(jì)數(shù)器沒有值,就會(huì)用該值
     */
    public SharedCount(CuratorFramework client, String path, int seedValue);
    protected SharedCount(CuratorFramework client, String path, SharedValue sv);

    /**
     * 獲取當(dāng)前計(jì)數(shù)
     */
    @Override
    public int getCount();

    /**
     * 獲取當(dāng)前節(jié)點(diǎn)對應(yīng)的版本信息
     */
    @Override
    public VersionedValue<Integer> getVersionedValue();

    /**
     * 設(shè)置計(jì)數(shù)器的值
     */
    public void     setCount(int newCount) throws Exception;

    /**
     * 設(shè)置計(jì)數(shù)器的值,這里要注意如果當(dāng)前版本的值在這個(gè)時(shí)刻有改變則設(shè)置不成功。CAS操作
     */
    public boolean  trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;

    /**
     * 添加監(jiān)聽器
     */
    @Override
    public void     addListener(SharedCountListener listener);
    @Override
    public void     addListener(final SharedCountListener listener, Executor executor);

    /**
     * 移除監(jiān)聽器
     */
    @Override
    public void     removeListener(SharedCountListener listener);

    /**
     * 啟動(dòng)
     */
    public void     start() throws Exception;

    /**
     * 結(jié)束
     */
    @Override
    public void close() throws IOException;
}

? ? ? ?SharedCount使用實(shí)例。模擬了10個(gè)zookeeper客戶端。每個(gè)客戶端都加5次。最終結(jié)果50就對了。

public class SharedCountTest {

    private static final String PATH_COUNTER = "/int/counter";

    class CounterThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final SharedCount counter;

        CounterThread(SharedCount counter, int index, CountDownLatch countDownLatch) {
            this.counter = counter;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    while (true) {
                        Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                        boolean success = counter.trySetCount(counter.getVersionedValue(), counter.getCount() + 1);
                        if (success) {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    System.out.println("當(dāng)前值為:" + counter.getCount());
                    counter.close();
                } catch (Exception e) {
                    //ignore
                }
                countDownLatch.countDown();
            }

        }
    }

    @Test
    public void sharedCount() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        // 如果節(jié)點(diǎn)存在,我們就刪除節(jié)點(diǎn)
        zookeeperClientList.get(0).delete().forPath(PATH_COUNTER);

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            SharedCount sharedCount = new SharedCount(zookeeperClientList.get(index), PATH_COUNTER, 0);
            sharedCount.addListener(new SharedCountListener() {
                @Override
                public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
                    System.out.println("計(jì)數(shù)器值改變,現(xiàn)在的值為:" + newCount);
                }

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    // 連接狀態(tài)改變
                }
            });
            sharedCount.start();
            new CounterThread(sharedCount, index, countDownLatch).start();
        }

        countDownLatch.await();
        zookeeperClientList.forEach(curatorFramework -> {
            // 關(guān)閉客戶端
            curatorFramework.close();
        });
    }

}

2.4.2 DistributedAtomicLong(long計(jì)數(shù)器)

? ? ? ?DistributedAtomicLong使用Long類型來計(jì)數(shù)。

DistributedAtomicLong api 介紹

public class DistributedAtomicLong implements DistributedAtomicNumber<Long> {

    /**
     * 構(gòu)造函數(shù)(樂觀鎖模式)
     *
     * @param client CuratorFramework
     * @param counterPath 節(jié)點(diǎn)路徑
     * @param retryPolicy 重試策略 -- 樂觀加鎖
     */
    public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy);

    /**
     * 構(gòu)造函數(shù),retryPolicy(樂觀加鎖)還沒成功,則進(jìn)行promotedToLock的方式以互斥的方式加鎖重試 (排他鎖模式)
     *
     * @param client CuratorFramework
     * @param counterPath 節(jié)點(diǎn)路徑
     * @param retryPolicy 重試策略 -- 樂觀加鎖
     * @param promotedToLock 排他鎖策略
     */
    public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy, PromotedToLock promotedToLock);

    /**
     * 獲取當(dāng)前值
     */
    @Override
    public AtomicValue<Long>     get() throws Exception

    /**
     * 強(qiáng)制設(shè)置計(jì)數(shù)值
     */
    @Override
    public void forceSet(Long newValue) throws Exception;

    /**
     * CAS更新(樂觀鎖模式更新)
     */
    @Override
    public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception;

    /**
     * 設(shè)置值
     */
    @Override
    public AtomicValue<Long>   trySet(Long newValue) throws Exception;

    /**
     * 如果之前沒有初始值,則把初始值設(shè)置進(jìn)去
     */
    @Override
    public boolean initialize(Long initialize) throws Exception;

    /**
     * +1
     */
    @Override
    public AtomicValue<Long>    increment() throws Exception;

    /**
     * -1
     */
    @Override
    public AtomicValue<Long>    decrement() throws Exception;

    /**
     * 加一個(gè)指定的值
     */
    @Override
    public AtomicValue<Long>    add(Long delta) throws Exception;

    /**
     * 鍵一個(gè)指定的值
     */
    @Override
    public AtomicValue<Long> subtract(Long delta) throws Exception;


}

? ? ? ?DistributedAtomicLong怎么使用,直接給實(shí)例。也是模擬10個(gè)客戶端,每個(gè)客戶端增加5次。最終結(jié)果得到50就對了。

public class DistributedAtomicLongTest {

    private static final String PATH_COUNTER = "/long/counter";

    class CounterThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final DistributedAtomicLong counter;

        CounterThread(DistributedAtomicLong counter, int index, CountDownLatch countDownLatch) {
            this.counter = counter;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    // 保證成功
                    while (true) {
                        AtomicValue<Long> value = counter.increment();
                        if (value.succeeded()) {
                            System.out.println("succeed: " + value.succeeded() + " value:" + value.postValue());
                            break;
                        }
                        Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);

                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }

        }
    }

    @Test
    public void distributedAtomicLong() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        // 如果節(jié)點(diǎn)存在,我們就刪除節(jié)點(diǎn)
        if (zookeeperClientList.get(0).checkExists().forPath(PATH_COUNTER) != null) {
            zookeeperClientList.get(0).delete().forPath(PATH_COUNTER);
        }

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            // 樂觀鎖模式
            DistributedAtomicLong count = new DistributedAtomicLong(zookeeperClientList.get(index), PATH_COUNTER, new RetryNTimes(10, 10));
            boolean initializeSuccess = count.initialize(0L);
            if (initializeSuccess) {
                System.out.println("初始化成功");
            } else {
                System.out.println("初始化失敗");
            }
            new CounterThread(count, index, countDownLatch).start();
        }

        countDownLatch.await();
        zookeeperClientList.forEach(curatorFramework -> {
            // 關(guān)閉客戶端
            curatorFramework.close();
        });
    }

}

2.5 分布式隊(duì)列

2.5.4 簡單隊(duì)列 - SimpleDistributedQueue

? ? ? ?SimpleDistributedQueue是一種簡單隊(duì)列,和jdk中隊(duì)列類似,擁有offer()、take()方法。

? ? ? ?SimpleDistributedQueue的使用是很簡單的,所以我們就直接給出SimpleDistributedQueue的使用實(shí)例了。

public class SimpleDistributedQueueTest {

    private static final String SIMPLE_DISTRIBUTED_QUEUE_PATH = "/SimpleDistributedQueue";

    class QueueActionThread extends Thread {

        private final SimpleDistributedQueue queue;
        private final CountDownLatch countDownLatch;
        private final int queueIndex;

        QueueActionThread(SimpleDistributedQueue queue, int index, CountDownLatch countDownLatch) {
            this.queue = queue;
            this.queueIndex = index;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                for (int index = 0; index < 5; index++) {
                    String message = "我是隊(duì)列:" + queueIndex + " 的第-" + index + "-條消息";
                    this.queue.offer(message.getBytes());
                }
                Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                for (int index = 0; index < 5; index++) {
                    byte[] queueItem = queue.take();
                    System.out.println("我是隊(duì)列:" + queueIndex + " 我收到了:" + new String(queueItem));
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }

        }
    }

    @Test
    public void simpleDistributedQueue() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            SimpleDistributedQueue queue = new SimpleDistributedQueue(zookeeperClientList.get(index), SIMPLE_DISTRIBUTED_QUEUE_PATH);
            new QueueActionThread(queue, index, countDownLatch).start();
        }

        countDownLatch.await();
        // 關(guān)閉客戶端
        zookeeperClientList.forEach(CuratorFramework::close);
    }

}

2.5.2 普通隊(duì)列 - DistributedQueue

? ? ? ?DistributedQueue是一種非常譜圖的隊(duì)列,沒啥騷操作。

? ? ? ?DistributedQueue的使用也是非常簡單的,我們也直接給出DistributedQueue的使用實(shí)例。

public class DistributedQueueTest {

    private static final String DISTRIBUTED_QUEUE_PATH = "/queue/distributedQueue";

    class QueueActionThread extends Thread {

        private final DistributedQueue<String> queue;
        private final CountDownLatch countDownLatch;
        private final int queueIndex;

        QueueActionThread(DistributedQueue<String> queue, int index, CountDownLatch countDownLatch) {
            this.queue = queue;
            this.queueIndex = index;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                this.queue.start();
                for (int index = 0; index < 5; index++) {
                    queue.put("隊(duì)列 " + queueIndex + " 發(fā)來的消息:" + index);
                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }

        }
    }



    @Test
    public void distributedQueue() throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            QueueBuilder<String> queueBuild = QueueBuilder.builder(zookeeperClientList.get(index), index % 2 == 0 ? new ConsumerImp(index + "") : null, createQueueSerializer(), DISTRIBUTED_QUEUE_PATH);
            DistributedQueue<String> queue = queueBuild.buildQueue();
            new QueueActionThread(queue, index, countDownLatch).start();
        }

        countDownLatch.await();
        // 關(guān)閉客戶端
        zookeeperClientList.forEach(CuratorFramework::close);

    }

    /**
     * 隊(duì)列消息序列化實(shí)現(xiàn)類
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    private class ConsumerImp implements QueueConsumer<String>{

        private String consumerName;

        public ConsumerImp(String consumerName) {
            this.consumerName = consumerName;
        }

        @Override
        public void consumeMessage(String message) throws Exception {
            System.out.println(consumerName + " 收到消息: " + message);
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {

        }
    }

}

2.5.3 帶id的隊(duì)列 - DistributedIdQueue

? ? ? ?DistributedIdQueue相對于DistributedQueue來說就是隊(duì)列里面的每個(gè)id都帶有一個(gè)id。所以DistributedIdQueue可以根據(jù)id刪除隊(duì)列里面的數(shù)據(jù)。其他部分和DistributedQueue一樣。實(shí)例我們就不寫了。

2.5.4 優(yōu)先級隊(duì)列 - DistributedPriorityQueue

? ? ? ?DistributedPriorityQueue是帶有優(yōu)先級的隊(duì)列,優(yōu)先級別高的先消費(fèi)。使用和DistributedQueue是差不多的。實(shí)例我們就不寫了。

2.5.4 延遲隊(duì)列 - DistributedDelayQueue

? ? ? ?DistributedDelayQueue是帶有延時(shí)功能的隊(duì)列。消息入隊(duì)的時(shí)候可以指定延時(shí)時(shí)間。讓該消息延時(shí)一段時(shí)間之后才可以被消費(fèi)。用法和DistributedQueue差不多。就不寫具體的實(shí)例代碼了。

2.6 分布式屏障 - Barrier

? ? ? ?分布式Barrier是這樣一個(gè)功能:它會(huì)阻塞所有節(jié)點(diǎn)上的等待進(jìn)程,直到某一個(gè)被滿足, 然后所有的節(jié)點(diǎn)繼續(xù)進(jìn)行。

2.6.1 DistributedBarrier

? ? ? ?DistributedBarrier允許多個(gè)分布式線程任務(wù)等待放行。直到有地方說放行則這些分布式線程進(jìn)入執(zhí)行任務(wù)。

DistributedBarrier api 介紹。

public class DistributedBarrier {

    /**
     * @param client CuratorFramework
     * @param barrierPath barrier路徑節(jié)點(diǎn)
     */
    public DistributedBarrier(CuratorFramework client, String barrierPath);

    /**
     * 設(shè)置柵欄,它將阻塞在它上面等待的線程:
     */
    public synchronized void         setBarrier() throws Exception;

    /**
     * 設(shè)置柵欄
     */
    public synchronized void      removeBarrier() throws Exception;

    /**
     * 等待放行條件
     */
    public synchronized void      waitOnBarrier() throws Exception
    public synchronized boolean      waitOnBarrier(long maxWait, TimeUnit unit) throws Exception;
}

? ? ? ?DistributedBarrier的使用。比如這里我們模擬了10個(gè)zookeeper客戶端。等待放行。

public class DistributedBarrierTest {

    private static final String BARRIER_PATH_COUNTER = "/barrier";

    class LogicThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final DistributedBarrier barrier;

        LogicThread(DistributedBarrier barrier, int index, CountDownLatch countDownLatch) {
            this.barrier = barrier;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                System.out.println("線程: " + threadIndex + " 請求進(jìn)入");
                // 阻塞等待
                barrier.waitOnBarrier();
                System.out.println("線程: " + threadIndex + " 成功進(jìn)入");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }

        }
    }

    @Test
    public void distributedBarrier() throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        // 如果節(jié)點(diǎn)存在,我們就刪除節(jié)點(diǎn)
        if (zookeeperClientList.get(0).checkExists().forPath(BARRIER_PATH_COUNTER) != null) {
            zookeeperClientList.get(0).delete().forPath(BARRIER_PATH_COUNTER);
        }
        DistributedBarrier controlBarrier = new DistributedBarrier(zookeeperClientList.get(0), BARRIER_PATH_COUNTER);
        controlBarrier.setBarrier();
        for (int index = 0; index < zookeeperClientList.size(); index++) {
            DistributedBarrier barrier = new DistributedBarrier(zookeeperClientList.get(index), BARRIER_PATH_COUNTER);
            new LogicThread(barrier, index, countDownLatch).start();
        }

        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);
        controlBarrier.removeBarrier();
        countDownLatch.await();
        zookeeperClientList.forEach(curatorFramework -> {
            // 關(guān)閉客戶端
            curatorFramework.close();
        });

    }

}

2.6.2 DistributedDoubleBarrier

? ? ? ?DistributedDoubleBarrier:類似CyclicBarrier 。允許多個(gè)分布式線程等待,等線程個(gè)數(shù)達(dá)到了指定數(shù)量的時(shí)候,就可以同時(shí)執(zhí)行或者同時(shí)退出了。

DistributedDoubleBarrier api 的使用

public class DistributedDoubleBarrier {

    /**
     * 構(gòu)造函數(shù),
     * memberQty是成員數(shù)量,當(dāng)enter()方法被調(diào)用時(shí),成員被阻塞,直到所有的成員都調(diào)用了enter()
     * 當(dāng)leave()方法被調(diào)用時(shí),它也阻塞調(diào)用線程,直到所有的成員都調(diào)用了leave()
     */
    public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty);

    /**
     * 進(jìn)入柵欄并且阻塞,直到所有的成員都進(jìn)入
     */
    public void     enter() throws Exception;
    public boolean     enter(long maxWait, TimeUnit unit) throws Exception;

    /**
     * 退出柵欄并且阻塞,知道所有的成員都退出
     */
    public synchronized void     leave() throws Exception;
    public synchronized boolean     leave(long maxWait, TimeUnit unit) throws Exception;
    
}

? ? ? ?DistributedDoubleBarrier的簡單使用,我們模擬10個(gè)zookeeper客戶端。當(dāng)有五個(gè)說要執(zhí)行或者退出的時(shí)候。我們就執(zhí)行或者退出。

public class DistributedDoubleBarrierTest {

    private static final String BARRIER_PATH_COUNTER = "/barrier";

    class LogicThread extends Thread {

        private final CountDownLatch countDownLatch;
        private final int threadIndex;
        private final DistributedDoubleBarrier barrier;

        LogicThread(DistributedDoubleBarrier barrier, int index, CountDownLatch countDownLatch) {
            this.barrier = barrier;
            this.threadIndex = index;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                Uninterruptibles.sleepUninterruptibly(5 * threadIndex, TimeUnit.SECONDS);
                System.out.println("線程:" + threadIndex + " 請求進(jìn)入");
                barrier.enter();
                System.out.println("線程:" + threadIndex + " 成功進(jìn)入");

                System.out.println("線程:" + threadIndex + " 請求離開");
                barrier.leave();
                System.out.println("線程:" + threadIndex + " 成功離開");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }

        }
    }

    @Test
    public void distributedDoubleBarrier() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<CuratorFramework> zookeeperClientList = Lists.newArrayList();

        // 啟動(dòng)10個(gè)zookeeper客戶端
        for (int index = 0; index < 10; index++) {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .retryPolicy(retryPolicy)
                    .sessionTimeoutMs(6000)
                    .connectionTimeoutMs(6000)
                    .build();
            // 啟動(dòng)客戶端
            client.start();
            zookeeperClientList.add(client);
        }

        // 如果節(jié)點(diǎn)存在,我們就刪除節(jié)點(diǎn)
        if (zookeeperClientList.get(0).checkExists().forPath(BARRIER_PATH_COUNTER) != null) {
            zookeeperClientList.get(0).delete().forPath(BARRIER_PATH_COUNTER);
        }

        for (int index = 0; index < zookeeperClientList.size(); index++) {
            DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(zookeeperClientList.get(index), BARRIER_PATH_COUNTER, 5);
            new LogicThread(barrier, index, countDownLatch).start();
        }

        countDownLatch.await();
        zookeeperClientList.forEach(curatorFramework -> {
            // 關(guān)閉客戶端
            curatorFramework.close();
        });
    }

}

三 Spring Boot使用Curator

? ? ? ?Spring Boot中使用Curator,我們要想辦法創(chuàng)建一個(gè)zookeeper客戶端.然后把這個(gè)客戶端對象添加到Spring容器中去.這樣我們就可以在各個(gè)地方拿到這個(gè)zookeeper客戶端對象.

? ? ? ?說先我們創(chuàng)建一個(gè)ZkClient類.這個(gè)ZkClient類就代碼我們一個(gè)zookeeper客戶端.

public class ZkClient {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * zookeeper客戶端實(shí)例
     */
    private CuratorFramework client;
    /**
     * 服務(wù)器列表,格式host1:port1,host2:port2,...
     */
    private String zookeeperServer;
    /**
     * 會(huì)話超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
     */
    private int sessionTimeoutMs;
    /**
     * 連接創(chuàng)建超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
     */
    private int connectionTimeoutMs;
    /**
     * 重試之間等待的初始時(shí)間
     */
    private int baseSleepTimeMs;
    /**
     * 當(dāng)連接異常時(shí)的重試次數(shù)
     */
    private int maxRetries;
    /**
     * 為了實(shí)現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,有的時(shí)候需要為每個(gè)業(yè)務(wù)分配一個(gè)獨(dú)立的命名空間
     */
    private String namespace;

    public void setZookeeperServer(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }
    

    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }

    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    }
    public void setBaseSleepTimeMs(int baseSleepTimeMs) {
        this.baseSleepTimeMs = baseSleepTimeMs;
    }

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    /**
     * spring 自動(dòng)調(diào)用,不需要我們主動(dòng)調(diào)用
     */
    public void init() {
        // 創(chuàng)建客戶端
        // 重連規(guī)則
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperServer)
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .namespace(namespace)
                .build();
        // 啟動(dòng)客戶端,連接服務(wù)器
        client.start();
    }

    /**
     * spring 自動(dòng)調(diào)用,不需要我們主動(dòng)調(diào)用
     */
    public void stop() {
        // 關(guān)閉客戶端
        client.close();
    }

    /**
     * 獲取 zookeeper 客戶端對象
     *
     * @return CuratorFramework
     */
    public CuratorFramework getClient() {
        return client;
    }

}

? ? ? ?接下來我們把ZkClient添加到Srping容器里面去.而且這里我們把一些動(dòng)態(tài)配置信息都放到了application.yml文件里面去了.

@Configuration
public class ZkConfiguration {

    /**
     * 服務(wù)器列表,格式host1:port1,host2:port2,...
     */
    @Value("${zookeeper.server}")
    private String zookeeperServer;
    /**
     * 會(huì)話超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
     */
    @Value(("${zookeeper.sessionTimeoutMs}"))
    private int sessionTimeoutMs;
    /**
     * 連接創(chuàng)建超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
     */
    @Value("${zookeeper.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    /**
     * 當(dāng)連接異常時(shí)的重試次數(shù)
     */
    @Value("${zookeeper.maxRetries}")
    private int maxRetries;
    /**
     * 重試之間等待的初始時(shí)間
     */
    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;
    /**
     * 為了實(shí)現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,有的時(shí)候需要為每個(gè)業(yè)務(wù)分配一個(gè)獨(dú)立的命名空間
     */
    @Value("${zookeeper.namespace}")
    private String namespace;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public ZkClient zkClient() {
        ZkClient zkClient = new ZkClient();
        zkClient.setZookeeperServer(zookeeperServer);
        zkClient.setSessionTimeoutMs(sessionTimeoutMs);
        zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
        zkClient.setMaxRetries(maxRetries);
        zkClient.setBaseSleepTimeMs(baseSleepTimeMs);
        zkClient.setNamespace(namespace);
        return zkClient;
    }

}

application.yml文件增加配置信息


# zeekeeper配置
zookeeper:
  server: 127.0.0.1:2181 # 服務(wù)器列表,格式host1:port1,host2:port2,...
  sessionTimeoutMs: 6000 # 會(huì)話超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
  connectionTimeoutMs: 6000 # 連接創(chuàng)建超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
  maxRetries: 3 # 當(dāng)連接異常時(shí)的重試次數(shù)
  baseSleepTimeMs: 1000 # 重試之間等待的初始時(shí)間
  namespace: lock # 為了實(shí)現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,有的時(shí)候需要為每個(gè)業(yè)務(wù)分配一個(gè)獨(dú)立的命名空間,不需要的時(shí)候可以去掉

? ? ? ?這樣我們就可以在我們項(xiàng)目里面的任何地方得到ZkClient對象了.我們可以在zookeeper客戶端為所欲為了.


? ? ? ?到此關(guān)于java zookeeper客戶端Curator的使用部分就講完了.文章中設(shè)計(jì)到的所有實(shí)例代碼在 https://github.com/tuacy/java-study工程目錄的zookeeper文件下面可以找到.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容