3、zk客戶端curator使用(轉(zhuǎn))

一、curator簡介

  • Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。

curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操作,例如重試策略等
curator-recipes:封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計數(shù)器、分布式Barrier等

  • 版本問題。使用curator的版本:2.12.0,對應(yīng)Zookeeper的版本為:3.4.x,如果跨版本會有兼容性問題,很有可能導(dǎo)致節(jié)點操作失敗
<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

二、Curator-framework基本api使用

1、創(chuàng)建會話

(1)靜態(tài)工廠方法創(chuàng)建會話

String connectionInfo = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
                .newClient(connectionInfo, 5000, 3000, retryPolicy);

newClient靜態(tài)工廠方法包含四個主要參數(shù):


靜態(tài)工廠參數(shù)

(2)使用fluent流式創(chuàng)建

  RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

(3)創(chuàng)建包含命名空間的會話

  • 為了實現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,需要為每個業(yè)務(wù)分配一個獨立的命名空間(NameSpace),即指定一個Zookeeper的根路徑(官方術(shù)語:為Zookeeper添加“Chroot”特性)。
 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                #創(chuàng)建命名空間為namespace的會話(該會話內(nèi)的操作都是基于該目錄進(jìn)行的)
                .namespace("namespace")
                .build();
2、啟動客戶端
client.start();
3、創(chuàng)建數(shù)據(jù)節(jié)點
  • Zookeeper的節(jié)點創(chuàng)建模式:

PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化并且?guī)蛄刑?br> EPHEMERAL:臨時
EPHEMERAL_SEQUENTIAL:臨時并且?guī)蛄刑?/p>

//創(chuàng)建一個節(jié)點,指定創(chuàng)建模式(臨時節(jié)點),附帶初始化內(nèi)容,并且自動遞歸創(chuàng)建父節(jié)點
client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());
4、刪除節(jié)點
//刪除一個節(jié)點,并且遞歸刪除其所有的子節(jié)點
client.delete().deletingChildrenIfNeeded().forPath("path");
//刪除一個節(jié)點,強(qiáng)制指定版本進(jìn)行刪除
client.delete().withVersion(10086).forPath("path");
//刪除一個節(jié)點,強(qiáng)制保證刪除,guaranteed()接口是一個保障措施,只要客戶端會話有效,
//那么Curator會在后臺持續(xù)進(jìn)行刪除操作,直到刪除節(jié)點成功。
client.delete().guaranteed().forPath("path");

5、讀取數(shù)據(jù)
//讀取一個節(jié)點的數(shù)據(jù)內(nèi)容,同時獲取到該節(jié)點的stat
讀取一個節(jié)點的數(shù)據(jù)內(nèi)容,同時獲取到該節(jié)點的stat
6、更新節(jié)點數(shù)據(jù)
//更新一個節(jié)點的數(shù)據(jù)內(nèi)容,該接口會返回一個Stat實例
 Stat path = client.setData().forPath("path", "data".getBytes());
//更新一個節(jié)點的數(shù)據(jù)內(nèi)容,強(qiáng)制指定版本進(jìn)行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
7、檢測節(jié)點是否存在
 Stat stat = client.checkExists().forPath("path");
8、獲取某個節(jié)點的所有子節(jié)點路徑
//該方法的返回值為List<String>,獲得ZNode的子節(jié)點Path列表。 可以調(diào)用額外的方法(監(jiān)控、后臺處
//理或者獲取狀態(tài)watch, background or get stat) 并在最后調(diào)用forPath()指定要操作的父ZNode
client.getChildren().forPath("path");
9、事物
  • CuratorFramework的實例包含inTransaction( )接口方法,調(diào)用此方法開啟一個ZooKeeper事務(wù). 可以復(fù)合create, setData, check, and/or delete 等操作然后調(diào)用commit()作為一個原子操作提交。一個例子如下:
client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();
10、異步接口
  • 上面提到的創(chuàng)建、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用于處理異步接口調(diào)用之后服務(wù)端返回的結(jié)果信息。BackgroundCallback接口中一個重要的回調(diào)值為CuratorEvent,里面包含事件類型、響應(yīng)嗎和節(jié)點的詳細(xì)信息。
  • CuratorEventType:


    CuratorEventType對應(yīng)的事件類型
  • 響應(yīng)碼(#getResultCode()):


    狀態(tài)響應(yīng)碼
//異步的創(chuàng)建節(jié)點
   Executor executor = Executors.newFixedThreadPool(2);
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground((curatorFramework, curatorEvent) -> {
                    System.out.println(String.format("eventType:%s,resultCode:%s", 
                            curatorEvent.getType(), curatorEvent.getResultCode()));
                }, executor)
                .forPath("path");

二、Curator-recipes實現(xiàn)高級特征

1、緩存cache
  • Zookeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是開發(fā)者需要反復(fù)注冊(Watcher只能單次注冊單次使用)。Cache是Curator中對事件監(jiān)聽的包裝,可以看作是對事件監(jiān)聽的本地緩存視圖,能夠自動為開發(fā)者處理反復(fù)注冊監(jiān)聽。Curator提供了三種Watcher(Cache)來監(jiān)聽結(jié)點的變化。

(1)Path Cache

  • Path Cache用來監(jiān)控一個ZNode的子節(jié)點. 當(dāng)一個子節(jié)點增加, 更新,刪除時, Path Cache會改變它的狀態(tài), 會包含最新的子節(jié)點, 子節(jié)點的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過PathChildrenCacheListener通知。涉及到的四個類:

PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData

//1、構(gòu)造函數(shù)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
//2、想使用cache,必須調(diào)用它的start方法,使用完后調(diào)用close方法??梢栽O(shè)置StartMode來實現(xiàn)啟動的模式,NORMAL:正常初始化。BUILD_INITIAL_CACHE:在調(diào)用start()之前會調(diào)用rebuild()。POST_INITIALIZED_EVENT: 當(dāng)Cache初始化數(shù)據(jù)后發(fā)送一個PathChildrenCacheEvent.Type#INITIALIZED事件
//3、可以增加listener監(jiān)聽緩存的變化。
public void addListener(PathChildrenCacheListener listener)
//4、遍歷所有的子節(jié)點
getCurrentData()

 private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        //1.創(chuàng)建client并啟動
        String connectionInfo = "127.0.0.1:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(connectionInfo, 5000, 3000, retryPolicy);
        client.start();
        //2.創(chuàng)建path cache并啟動
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        //3、進(jìn)行事件監(jiān)聽
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件類型:" + event.getType());
            //如果new PathChildrenCache(client, PATH, true)中的參數(shù)cacheData值設(shè)置為false,則示例中的
            // event.getData().getData()、data.getData()將返回null,cache將不會緩存節(jié)點數(shù)據(jù)。
            if (null != event.getData()) {
                System.out.println("節(jié)點數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        //4、進(jìn)行響應(yīng)的curd操作,看看事件回調(diào)結(jié)果
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }

(2)Node Cache

  • NodeCache只能監(jiān)聽一個節(jié)點的狀態(tài)變化。
  • Node Cache與Path Cache類似,Node Cache只是監(jiān)聽某一個特定的節(jié)點。它涉及到下面的三個類:

NodeCache - Node Cache實現(xiàn)類
NodeCacheListener - 節(jié)點監(jiān)聽器
ChildData - 節(jié)點數(shù)據(jù)

 client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("節(jié)點數(shù)據(jù):" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("節(jié)點被刪除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");

(3)Tree Cache

  • Tree Cache可以監(jiān)控整個樹上的所有節(jié)點,類似于PathCache和NodeCache的組合,主要涉及到下面四個類:

TreeCache - Tree Cache實現(xiàn)類
TreeCacheListener - 監(jiān)聽器類
TreeCacheEvent - 觸發(fā)的事件類
ChildData - 節(jié)點數(shù)據(jù)

  • 注意:TreeCache在初始化(調(diào)用start()方法)的時候會回調(diào)TreeCacheListener實例一個事TreeCacheEvent,而回調(diào)的TreeCacheEvent對象的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導(dǎo)致空指針異常,這里應(yīng)該主動處理并避免這種情況。
 client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件類型:" + event.getType() +
                        " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
2、leader選舉
  • Curator 有兩種leader選舉的recipe,分別是LeaderSelector和LeaderLatch。前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。后者是一旦選舉出Leader,除非有客戶端掛掉重新觸發(fā)選舉,否則不會交出領(lǐng)導(dǎo)權(quán)。
    (1)LeaderLatch
public class LeaderLatchTest {
    /**
     * 1、一旦啟動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然后其中一個最終會被選舉為leader,
     * 2、可以通過hasLeadership方法查看LeaderLatch實例是否leader:
     * 3、 LeaderLatch在請求成為leadership會block(阻塞),一旦不使用LeaderLatch了,必須調(diào)用close方法。
     * 如果它是leader,會釋放leadership, 其它的參與者將會選舉一個leader。
     * 4、 錯誤處理:LeaderLatch實例可以增加ConnectionStateListener來監(jiān)聽網(wǎng)絡(luò)連接問題。 當(dāng) SUSPENDED 或 LOST 時,
     * leader不再認(rèn)為自己還是leader。當(dāng)LOST后連接重連后RECONNECTED,LeaderLatch會刪除先前的ZNode然后重新
     * 創(chuàng)建一個。LeaderLatch用戶必須考慮導(dǎo)致leadership丟失的連接問題。
     * 強(qiáng)烈推薦你使用ConnectionStateListener
     */

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;

    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        String connectionInfo = "127.0.0.1:2181";
        try {
            //1、先創(chuàng)建10個leaderLatch
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                //2、啟動后,選中一個作為leader
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            //3、通過hasLeadership查看自己是否是leader, 如果是的話返回true。
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            //4、close釋放當(dāng)前的領(lǐng)導(dǎo)權(quán)。
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState() && !latch.getState().equals(LeaderLatch.State.CLOSED))
                    latch.close();
            }
            for (CuratorFramework client : clients) {
                client.close();
            }
        }
    }
}

(2)LeaderSelector

  • LeaderSelector使用的時候主要涉及下面幾個類:

LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException

  • 類似LeaderLatch,LeaderSelector必須start: leaderSelector.start(); 一旦啟動,當(dāng)實例取得領(lǐng)導(dǎo)權(quán)時你的listener的takeLeadership()方法被調(diào)用。而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時才返回。 當(dāng)你不再使用LeaderSelector實例時,應(yīng)該調(diào)用它的close方法。
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    //使用AtomicInteger來記錄此client獲得領(lǐng)導(dǎo)權(quán)的次數(shù), 它是”fair”, 每個client有平等的機(jī)會獲得領(lǐng)導(dǎo)權(quán)。
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        //調(diào)用 leaderSelector.autoRequeue();保證在此實例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)。
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    /**
     * leaderSelector.start(); 一旦啟動,當(dāng)實例取得領(lǐng)導(dǎo)權(quán)時你的listener的takeLeadership()方法被調(diào)用。
     * 而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時才返回。
     * 你可以在takeLeadership進(jìn)行任務(wù)的分配等等,并且不要返回,
     * 如果你想要要此實例一直是leader的話可以加一個死循環(huán)。
     * @param client
     * @throws Exception
     */
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}
public class LeaderSelectorDemo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        String connectionInfo = "127.0.0.1:2181";
        try {
            //1、構(gòu)建10個LeaderSelectorAdapter
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                //2、啟動,并開始選舉
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

(3)小結(jié)
對比可知,LeaderLatch必須調(diào)用close()方法才會釋放領(lǐng)導(dǎo)權(quán),而對于LeaderSelector,通過LeaderSelectorListener可以對領(lǐng)導(dǎo)權(quán)進(jìn)行控制, 在適當(dāng)?shù)臅r候釋放領(lǐng)導(dǎo)權(quán),這樣每個節(jié)點都有可能獲得領(lǐng)導(dǎo)權(quán)。從而,LeaderSelector具有更好的靈活性和可控性,建議有LeaderElection應(yīng)用場景下優(yōu)先使用LeaderSelector。

3、分布式鎖
  • 要點:

1.推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),因為當(dāng)連接LOST時你不再擁有鎖
2.分布式的鎖全局同步, 這意味著任何一個時間點不會有兩個客戶端都擁有相同的鎖。
(1)可重入共享鎖Shared Reentrant Lock

  • Shared意味著鎖是全局可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似,即可重入, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex來實現(xiàn)。 它的構(gòu)造函數(shù)為:
public InterProcessMutex(CuratorFramework client, String path)
  • 通過acquire()獲得鎖,并提供超時機(jī)制;并通過release()釋放鎖
  • 請求撤銷當(dāng)前的鎖, 調(diào)用attemptRevoke()方法,注意鎖釋放時RevocationListener將會回調(diào)。
public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真實環(huán)境中我們會在這里訪問/維護(hù)一個共享的資源
        //這個例子在使用鎖的情況下不會非法并發(fā)異常IllegalStateException
        //但是在無鎖的情況由于sleep了一段時間,很容易拋出異常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}
public class InterProcessMutexDemo {
    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        //加鎖、其他被阻塞
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            //解鎖
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        String connectionInfo = "127.0.0.1:2181";
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {

        }
    }
}

(2)不可重入共享鎖—Shared Lock

  • 這個鎖和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味著它不能在同一個線程中重入,具體實現(xiàn)類為InterProcessSemaphoreMutex

(3)可重入讀寫鎖—Shared Reentrant Read Write Lock

  • 類似JDK的ReentrantReadWriteLock。一個讀寫鎖管理一對相關(guān)的鎖。一個負(fù)責(zé)讀操作,另外一個負(fù)責(zé)寫操作。讀操作在寫鎖沒被使用時可同時由多個進(jìn)程使用,而寫鎖在使用時不允許讀(阻塞)。此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,但是讀鎖卻不能進(jìn)入寫鎖。這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 --->請求讀鎖--->釋放讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的??芍厝胱x寫鎖主要由兩個類實現(xiàn):InterProcessReadWriteLock、InterProcessMutex。使用時首先創(chuàng)建一個InterProcessReadWriteLock實例,然后再根據(jù)你的需求得到讀鎖或者寫鎖,讀寫鎖的類型是InterProcessMutex。
    (4)信號量—Shared Semaphore
  • 一個計數(shù)的信號量類似JDK的Semaphore。 JDK中Semaphore維護(hù)的一組許可(permits),而Curator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數(shù)。第一種方式是用戶給定path并且指定最大LeaseSize。第二種方式用戶給定path并且使用SharedCountReader類。如果不使用SharedCountReader, 必須保證所有實例在多進(jìn)程中使用相同的(最大)租約數(shù)量,否則有可能出現(xiàn)A進(jìn)程中的實例持有最大租約數(shù)量為10,但是在B進(jìn)程中持有的最大租約數(shù)量為20,此時租約的意義就失效了。
  • 這次調(diào)用acquire()會返回一個租約對象。 客戶端必須在finally中close這些租約對象,否則這些租約會丟失掉。 但是, 但是,如果客戶端session由于某種原因比如crash丟掉, 那么這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續(xù)使用這些租約。
  • 注意你可以一次性請求多個租約,如果Semaphore當(dāng)前的租約不夠,則請求線程會被阻塞。 同時還提供了超時的重載方法。
    (5)多共享鎖對象 —Multi Shared Lock
  • Multi Shared Lock是一個鎖的容器。 當(dāng)調(diào)用acquire(), 所有的鎖都會被acquire(),如果請求失敗,所有的鎖都會被release。 同樣調(diào)用release時所有的鎖都被release(失敗被忽略)。
4、分布式計數(shù)器
  • 計數(shù)器是用來計數(shù)的, 利用ZooKeeper可以實現(xiàn)一個集群共享的計數(shù)器。 只要使用相同的path就可以得到最新的計數(shù)器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數(shù)器, 一個是用int來計數(shù)(SharedCount),一個用long來計數(shù)(DistributedAtomicLong)。

(1)分布式int計數(shù)器—SharedCount

  • 這個類使用int類型來計數(shù)。 主要涉及三個類。SharedCount代表計數(shù)器, 可以為它增加一個SharedCountListener,當(dāng)計數(shù)器改變時此Listener可以監(jiān)聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本信息的值VersionedValue。

SharedCount
SharedCountReader
SharedCountListener

  • 注意計數(shù)器必須start,使用完之后必須調(diào)用close關(guān)閉它。
    (2)分布式long計數(shù)器—DistributedAtomicLong
  • 再看一個Long類型的計數(shù)器。 除了計數(shù)的范圍比SharedCount大了之外, 它首先嘗試使用樂觀鎖的方式設(shè)置計數(shù)器, 如果不成功(比如期間計數(shù)器已經(jīng)被其它client更新了), 它使用InterProcessMutex方式來更新計數(shù)值。此計數(shù)器有一系列的操作:

get(): 獲取當(dāng)前值
increment(): 加一
decrement(): 減一
add(): 增加特定的值
subtract(): 減去特定的值
trySet(): 嘗試設(shè)置計數(shù)值
forceSet(): 強(qiáng)制設(shè)置計數(shù)值

5、分布式隊列
  • 基本不會使用zk作為分布式隊列使用
  • Curator也提供ZK Recipe的分布式隊列實現(xiàn)。 利用ZK的 PERSISTENTS_EQUENTIAL節(jié)點, 可以保證放入到隊列中的項目是按照順序排隊的。 如果單一的消費(fèi)者從隊列中取數(shù)據(jù), 那么它是先入先出的,這也是隊列的特點。 如果你嚴(yán)格要求順序,你就的使用單一的消費(fèi)者,可以使用Leader選舉只讓Leader作為唯一的消費(fèi)者。
6、分布式屏障—Barrier
  • 分布式Barrier是這樣一個類: 它會阻塞所有節(jié)點上的等待進(jìn)程,直到某一個被滿足, 然后所有的節(jié)點繼續(xù)進(jìn)行,先關(guān)實現(xiàn)類DistributedBarrier

轉(zhuǎn)載:https://my.oschina.net/woter/blog/1933298

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容