[Zookeeper] 客戶端Curator

1 Zookeeper原生客戶端不足

  • 在連接zk超時(shí)的時(shí)候,不支持自動(dòng)重連,需要手動(dòng)重連
  • Watch注冊(cè)一次就會(huì)失效,需要反復(fù)注冊(cè)
  • 不支持遞歸創(chuàng)建節(jié)點(diǎn)
  • Curator能提供更多方案并且實(shí)現(xiàn)簡(jiǎn)答,例如分布式鎖

2 會(huì)話創(chuàng)建

  1. 使用CuratorFrameworkFactory這個(gè)工廠類的兩個(gè)靜態(tài)方法來(lái)創(chuàng)建一個(gè)客戶端,如builder().build()或者newClient(),返回CuratorFramework對(duì)象

  2. 通過(guò)調(diào)用CuratorFramework中的start()方法來(lái)啟動(dòng)會(huì)話

CuratorFramework curatorFramework= CuratorFrameworkFactory
                .newClient("",new ExponentialBackoffRetry(1000,3));
        
curatorFramework.start();
CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

client.start();

3 節(jié)點(diǎn)操作

// 創(chuàng)建數(shù)據(jù)節(jié)點(diǎn)/home,內(nèi)容為test
client.create().forPath("/home","test".getBytes());
// 更新數(shù)據(jù)節(jié)點(diǎn)數(shù)據(jù)
client.setData().forPath("/home","test1".getBytes());
// 獲取數(shù)據(jù)節(jié)點(diǎn)信息
client.getData().forPath("/home");
// 獲取數(shù)據(jù)節(jié)點(diǎn)權(quán)限
client.getACL().forPath("/home");
// 更新數(shù)據(jù)節(jié)點(diǎn)權(quán)限client.setACL().withACL(list).forPath("/home");
// 刪除數(shù)據(jù)節(jié)點(diǎn)/home
client.delete().forPath("/home");

4 代碼演示

4.1 添加pom依賴

        <!--對(duì)zookeeper的底層api的一些封裝-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <!--封裝了一些高級(jí)特性,如:Cache時(shí)間監(jiān)聽(tīng),選舉,分布式鎖,分布式barrier-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

4.2 代碼分析

  1. 連接重試策略
  • RetryForever:一直重試,直至連接成功,一般不使用
  • RetryNTime:指定重連的次數(shù)N
  • RetryUtilElapsed:指定最大重連超時(shí)時(shí)間和重連時(shí)間間隔,間歇性重連直到超時(shí)或者連接成功
  • ExponentialBackoffRetry:基于“backoff”方式重連,重連的時(shí)間間隔是動(dòng)態(tài)的
  • BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重試次數(shù)的控制
  1. 默認(rèn)Curator創(chuàng)建節(jié)點(diǎn)是永久節(jié)點(diǎn),如果需要?jiǎng)?chuàng)建臨時(shí)節(jié)點(diǎn)
client.create().withMode(CreateMode.EPHEMERAL).forPath("/aa");

一般執(zhí)行完畢之后,session就會(huì)關(guān)閉了,創(chuàng)建的臨時(shí)節(jié)點(diǎn)就不存在了。


creatingParentsIfNeeded()方法的意思是如果父節(jié)點(diǎn)不存在,則在創(chuàng)建節(jié)點(diǎn)的同時(shí)創(chuàng)建父節(jié)點(diǎn)

client.create().creatingParentsIfNeeded().forPath("/test");

舊版本中 creatingParentContainersIfNeeded()等于creatingParentsIfNeeded();
新版本中 creatingParentContainersIfNeeded()以容器模式遞歸創(chuàng)建節(jié)點(diǎn)


刪除節(jié)點(diǎn)時(shí),如果當(dāng)前節(jié)點(diǎn)還存在子節(jié)點(diǎn),那么刪除時(shí)會(huì)報(bào)錯(cuò),如果要?jiǎng)h除的話,就得加上deletingChildrenIfNeeded()

client.delete().deletingChildrenIfNeeded().forPath("/test");

在連接失敗的情況下,可能刪除失敗,如果確定刪除的,則加上guaranteed(),則在不停重試中,不停去刪除節(jié)點(diǎn)。


使用Stat,則利用storingStatIn

 Stat stat=new Stat();
byte[] bytes1=client.getData().storingStatIn(stat).forPath("/aa/bb");
System.out.println(new String(bytes1));
System.out.println(stat.getCversion());

5 分布式鎖

public class DistributedLockTest {


    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString("localhost:2181")
                .connectionTimeoutMs(5000)
                .sessionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

        client.start();

        String path = "/orderNo";
        InterProcessMutex mutex = new InterProcessMutex(client, path);


        Stat stat = client.checkExists().forPath(path);
        if (stat == null) {
            client.create().forPath(path, "1000".getBytes());
        } else {
            client.setData().forPath(path, "1000".getBytes());
        }

        for (int i = 0; i < 10; i++) {

            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        mutex.acquire();

                        String numStr = new String(client.getData().forPath(path));
                        int orderNum = Integer.parseInt(numStr);
                        String currentNum = orderNum + 1 + "";
                        client.setData().forPath(path, currentNum.getBytes());

                        System.out.println("currentNum = " + new String(client.getData().forPath(path)));

                        mutex.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            
        }
    }
}

源碼分析:

  1. 初始化InterProcessMutex
  1. 調(diào)用InterProcessMutexacquire方法

實(shí)際上調(diào)用的是internalLock方法
internalLock --》LockInternalsattemptLock --》createsTheLockinternalLockLoop

StandardLockInternalsDrivercreatesTheLock方法:創(chuàng)建臨時(shí)順序節(jié)點(diǎn)

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容