Zookeeper入門之五-常見應(yīng)用場景及代碼

Master選舉實(shí)現(xiàn)

思路:選擇一個根節(jié)點(diǎn),例如/master_select,多臺機(jī)器同時向該節(jié)點(diǎn)創(chuàng)建一個子節(jié)點(diǎn) /master_select/lock,利用zk的特性,最終只有一臺機(jī)器能夠創(chuàng)建成功,這臺機(jī)器就是master

 static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .namespace("master_select")
            .build();

    public static void main(String[] args) throws InterruptedException {

        zkFluentClient.start();

        String selectPath = "/master_select";

        LeaderSelector selector = new LeaderSelector(zkFluentClient, selectPath, new LeaderSelectorListenerAdapter() {
            @Override
            // 需要注意的是,一旦執(zhí)行完這個方法,curator就會立即釋放Master的權(quán)利,然后重新開始新一輪的Master選舉
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("Be a Leader");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("釋放 Leader ");
            }
        });

        selector.autoRequeue();

        selector.start();

        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

    }

如果同時有2個請求,可以看到交替執(zhí)行,創(chuàng)建2個臨時節(jié)點(diǎn):

[zk: localhost:2181(CONNECTED) 21] ls /master_select/master_select
[_c_93265fd6-4b11-4668-baf8-e4211a8d1b5f-lock-0000000067, _c_36a0c859-efb1-442a-9dff-26121e7a1a7e-lock-0000000068]

這里的臨時節(jié)點(diǎn),在master失效的時候就會被刪除。

一旦takeLeaderShip執(zhí)行結(jié)束,master的就會被釋放,然后重新開始新一輪的master選舉。

分布式鎖

使用InterProcessMutex來做分布式鎖處理

public class DistributeLockTest {

    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .namespace("lock")
            .build();

    public static void main(String[] args) {
        zkFluentClient.start();

        final InterProcessMutex lock = new InterProcessMutex(zkFluentClient, "/distribute_lock");

        final CountDownLatch latch = new CountDownLatch(1);

        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        latch.await();
                        lock.acquire(); // 獲取鎖

                        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                        String orderNo = sdf.format(Date.from(Instant.now()));
                        System.out.println("OrderNo is:" + orderNo);

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }


                }
            }).start();
        }

        latch.countDown();// 這里很有意思,在主線程啟動了幾十個線程之后,這些線程都是hold住的(通過 countDownLatch.await()方法)
        // 然后主線程處理latch.countDown(),導(dǎo)致所有子線程同時滿足觸發(fā)條件,同時執(zhí)行,保證并發(fā)。不過僅用在測試環(huán)節(jié)比較合適。
        // 其實(shí)latch可以去掉,只是這樣并發(fā)沒有那么集中。
    }
}
分布式計數(shù)器

思路很類似,用上述分布式鎖的思路。

比如統(tǒng)計在線人數(shù),指定zk的一個數(shù)據(jù)節(jié)點(diǎn)作為計數(shù)器,多個應(yīng)用實(shí)例在分布式鎖的控制下,通過更新該數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容來實(shí)現(xiàn)計數(shù)功能。

public class DistributeCounterTest {

    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .namespace("counter")
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(new ExponentialBackoffRetry(800, 5))
            .build();

    public static void main(String[] args) throws Exception {

        zkFluentClient.start();

        // 計數(shù)器
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder",
                new RetryNTimes(3, 1000));

        AtomicValue<Integer> rc = atomicInteger.add(8);
        rc = atomicInteger.increment();
        rc = atomicInteger.decrement();
        atomicInteger.increment();

        System.out.println("Result:" + rc.succeeded());
        System.out.println("preValue:" + rc.preValue() + ",postValue:" + rc.postValue());
        System.out.println();

        String value = new String(zkFluentClient.getData().forPath("/adder"));
        System.out.println(value);

        // 試著重新取
        DistributedAtomicInteger newAtomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder", new RetryNTimes(3, 800));
        System.out.println(newAtomicInteger.get().preValue() + "_" + newAtomicInteger.get().postValue());
    }
}

可以看到,只要同一個路徑下,對應(yīng)的DistributeAtomicInteger的對象值都是同一個,可以隨時創(chuàng)建一個對象直接使用。

分布式Barrier

先看一個JDK自帶的CyclicBarrier,先看下CyclicBarrier的說明:

CyclicBarrier 的字面意思是可循環(huán)(Cyclic)使用的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個屏障(也可以叫同步點(diǎn))時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)干活。線程進(jìn)入屏障通過CyclicBarrier的await()方法。

CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。

實(shí)現(xiàn)原理:在CyclicBarrier的內(nèi)部定義了一個Lock對象,每當(dāng)一個線程調(diào)用CyclicBarrier的await方法時,將剩余攔截的線程數(shù)減1,然后判斷剩余攔截數(shù)是否為0,如果不是,進(jìn)入Lock對象的條件隊(duì)列等待。如果是,執(zhí)行barrierAction對象的Runnable方法,然后將鎖的條件隊(duì)列中的所有線程放入鎖等待隊(duì)列中,這些線程會依次的獲取鎖、釋放鎖,接著先從await方法返回,再從CyclicBarrier的await方法中返回。

CyclicBarrier主要用于一組線程之間的相互等待,而CountDownLatch一般用于一組線程等待另一組線程。實(shí)際上可以通過CountDownLatch的countDown()和await()來實(shí)現(xiàn)CyclicBarrier的功能。即 CountDownLatch中的countDown()+await() = CyclicBarrier中的await()。注意:在一個線程中先調(diào)用countDown(),然后調(diào)用await()。

先看代碼

public class DistributeCyclicBarrierTest {

    static CyclicBarrier jdkBarrier = new CyclicBarrier(3);
    

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new Thread(new JdkBasedRuner("jinsiyu")));
        executorService.execute(new Thread(new JdkBasedRuner("AMANDA")));
        executorService.execute(new Thread(new JdkBasedRuner("QQ")));
    }

    static class JdkBasedRuner implements Runnable {

        private String name;

        public JdkBasedRuner(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(name + " Ready!!!");

            try {
                jdkBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println(name + " GO!!");
        }
    }
}

結(jié)果如下:

jinsiyu Ready!!!
AMANDA Ready!!!
QQ Ready!!!

QQ GO!!
jinsiyu GO!!
AMANDA GO!!

可以看到,只有當(dāng)CyclicBarrier中的值為0時,才會統(tǒng)一執(zhí)行其后的操作,也就是“XXX GO”的語句打印。

而如果這里jdkBarrier如果設(shè)置的為4,那么下面三句“XXX GO”的語句根本不會打印,會一直等待。

ZK下的實(shí)現(xiàn):

    static DistributedBarrier distributedBarrier;

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000)
            .connectString("localhost:32770")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .namespace("cyclicBarrier").build();

.......
        zkClient.start();
                // distribute
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    distributedBarrier = new DistributedBarrier(zkClient, "/barrier");
                    System.out.println(Thread.currentThread().getName() + "號barrier設(shè)置");
                    try {
                        distributedBarrier.setBarrier(); // 看實(shí)現(xiàn),就是在create節(jié)點(diǎn)
                        distributedBarrier.waitOnBarrier(); // 等待,直到remove
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "Starting...");
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(2);
        distributedBarrier.removeBarrier(); // delete節(jié)點(diǎn)

ZKPaths & EnsurePath
public class ZKPathsTest {

    static String path = "/zkpath_sample";

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();

        System.out.println(ZKPaths.fixForNamespace(path,"/sub"));
        System.out.println(ZKPaths.makePath(path,"/sub"));
        System.out.println(ZKPaths.getNodeFromPath("/zkpath_sample/sub1")); // 不存在節(jié)點(diǎn)也不會報錯,從路徑str中截取


        ZKPaths.PathAndNode pn = ZKPaths.getPathAndNode("/zkpath_sample/sub1"); // 獲取節(jié)點(diǎn),不存在也不會報錯,只是從路徑上截取
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        // 獲取zookeeper,這個是干啥的?
        ZooKeeper zooKeeper = zkClient.getZookeeperClient().getZooKeeper();

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zooKeeper,dir1); // 創(chuàng)建目錄,如果存在不會報錯,也不會拋異常
        ZKPaths.mkdirs(zooKeeper,dir2);

        System.out.println(ZKPaths.getSortedChildren(zooKeeper,path)); // 獲取已排序的子節(jié)點(diǎn)

        ZKPaths.deleteChildren(zooKeeper,path,false); // 刪除子節(jié)點(diǎn),如果最后一個參數(shù)為true,會刪除本身
    }
}
public class EnsurePathTest {

    static String path = "/path2";

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .connectString("localhost:32770")
            .namespace("ensure")
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();

        EnsurePath ensurePath = new EnsurePath(path);  // 這里跟namespace沒關(guān)系,只會從根目錄下開始建,所以是絕對路徑了
        ensurePath.ensure(zkClient.getZookeeperClient());

        EnsurePath ensurePath1 = zkClient.newNamespaceAwareEnsurePath("/c2"); // 用這個方法,namespace生效
        ensurePath1.ensure(zkClient.getZookeeperClient());
    }

}

不過EnsurePath貌似已經(jīng)不推薦使用了。

順序節(jié)點(diǎn)
    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .namespace("sequence-jin")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();
        // 創(chuàng)建順序節(jié)點(diǎn)
        zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                .forPath("/seq");
    }

執(zhí)行多次后,結(jié)果如下:

ls /sequence-jin
[seq0000000001, seq0000000000, seq0000000002]

可以看到,順序節(jié)點(diǎn)。。。就是如此

可以通過臨時節(jié)點(diǎn)來代替心跳,來判斷client端是否存在。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容