[ZooKeeper]基于Java API 實(shí)踐

前提

建立maven項(xiàng)目中 要導(dǎo)入zookeeper的依賴

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.8</version>
</dependency>

我們同時(shí)可以打開(kāi)linux中的zookeeper客戶端來(lái)驗(yàn)證對(duì)比。輸入 zkCli.sh 便可以進(jìn)入zookeeper客戶端 。

一、建立連接

直接建立連接后,不進(jìn)行等待判斷 運(yùn)行結(jié)果為連接中(CONNECTING)。

清單1 連接中 zookeeper
    // 一、沒(méi)有連接成功
    public static void main(String[] args) {

        // zookeeper集群 ip  :客戶端口
        String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";

        try {
            ZooKeeper zooKeeper=new ZooKeeper(kvm,4000,null);
       // CONNECTING  連接中   ==》根據(jù)zookeeper的四個(gè)狀態(tài),可知、;沒(méi)有連接成功
            System.out.println(zooKeeper.getState()); 
            zooKeeper.close();                          // 關(guān)閉連接
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

添加CountDownLatch同步工具類.

清單2 連接zookeeper成功
   // 二、連接成功   zookeeper
    /**
     * 同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程的操作執(zhí)行完后再執(zhí)行。
     */
    public static void main(String[] args) {
        final CountDownLatch countDownLatch=new CountDownLatch(1);
        String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";

        try {
            ZooKeeper zooKeeper=new ZooKeeper(kvm, 4000, new Watcher() {
                // process: 觀察者隊(duì)列
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //  SyncConnected  :同步連接
                    if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
                        // 如果收到服務(wù)端的響應(yīng)時(shí)間,連接成功

                        countDownLatch.countDown();
                        System.out.println("建立連接成功");
                    }

                }
            });

            countDownLatch.await();
            System.out.println(zooKeeper.getState()); // CONNECTED  :連接成功
            zooKeeper.close();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

二、數(shù)據(jù)的增刪改查操作

基于代碼的增刪改查操作,我們可以去到zookeeper中的zkCli客戶端窗口中去驗(yàn)證。
進(jìn)入zkCli客戶端窗口:在bin目錄下 直接輸入 zkCli.sh 回車,便可進(jìn)入。

清單1 事務(wù)操作
 public static void main(String[] args) {

        try {
            final CountDownLatch countDownLatch=new CountDownLatch(1);
            java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";

            java.lang.String kvm2="192.168.0.85:2181";

            ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {

                    if (Event.KeeperState.SyncConnected==watchedEvent.getState()){
                        countDownLatch.countDown();//如果收到了服務(wù)端的響應(yīng)時(shí)間,連接成功
                        System.out.println("zk 建立連接");
                    }
                }
            });
            countDownLatch.await();


            System.out.println(zooKeeper.getState());// connected  成功

            /**
             * zookeeper數(shù)據(jù)的增刪改查
             */
            // 1.新增節(jié)點(diǎn)
            zooKeeper.create("/zk-wcl","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            Thread.sleep(1000);
            Stat stat=new Stat();// 狀態(tài)
            // 2.查看新增后的節(jié)點(diǎn)狀態(tài)
            byte[] bytes=zooKeeper.getData("/zk-wcl",null,stat);

            System.out.println(new String(bytes));
            // 3.修改節(jié)點(diǎn)
            zooKeeper.setData("/zk-wcl","1".getBytes(),stat.getVersion());

            // 4.查看修改后的節(jié)點(diǎn)狀態(tài)
            byte[] byte2=zooKeeper.getData("/zk-wcl",null,stat);
            System.out.println(new String(byte2));


            // 5.刪除節(jié)點(diǎn)
            zooKeeper.delete("/zk-wcl",stat.getVersion());


            zooKeeper.close();
            System.in.read();// 當(dāng)前進(jìn)程阻塞
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }


    }

三、事件機(jī)制

介紹

Watcher監(jiān)聽(tīng)機(jī)制是ZooKeeper中非常重要的特性,我們基于zookeeper上創(chuàng)建的節(jié)點(diǎn),可以對(duì)這些節(jié)點(diǎn)綁定監(jiān)聽(tīng)事件,比如可以監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)變更、節(jié)點(diǎn)刪除、子節(jié)點(diǎn)狀態(tài)變更等事件。通過(guò)這個(gè)事件機(jī)制,可以基于zookeeper實(shí)現(xiàn)分布式鎖、集群管理功能。

Watcher特性:當(dāng)數(shù)據(jù)發(fā)生變化的時(shí)候,zookeeper會(huì)產(chǎn)生一個(gè)Watcher事件,并且會(huì)發(fā)送到客戶端,但是客戶端只會(huì)接受一次通知。如果后續(xù)這個(gè)節(jié)點(diǎn)再次發(fā)生變化,那么之前的設(shè)置watcher的客戶端不會(huì)再次受到消息。

watcher是一次性操作??梢酝ㄟ^(guò)循環(huán)監(jiān)聽(tīng)去達(dá)到永久監(jiān)聽(tīng)效果。

1、如何注冊(cè)事件機(jī)制

分為兩步:綁定事件、觸發(fā)事件。
第一步:通過(guò)getDate、exists 、getChildren這三個(gè)操作來(lái)綁定事件。
第二步:凡是事務(wù)類型的操作,都會(huì)觸發(fā)監(jiān)聽(tīng)事件。
事務(wù)操作:create 、``delete````、 setData

清單1:創(chuàng)建刪除節(jié)點(diǎn)的事件注冊(cè)監(jiān)聽(tīng)實(shí)例
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        /*------------------基于Java API 建立連接---------------------------*/
        final CountDownLatch countDownLatch=new CountDownLatch(1);
        java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";

        java.lang.String kvm2="192.168.0.85:2181";
        // TODO: 這兒是全局的Watcher
        ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 默認(rèn)事件
                System.out.println("默認(rèn)事件:"+watchedEvent.getType());
                if (Event.KeeperState.SyncConnected==watchedEvent.getState()){
                    countDownLatch.countDown();//如果收到了服務(wù)端的響應(yīng)時(shí)間,連接成功
                    System.out.println("zk 建立連接");
                }
            }
        });
        countDownLatch.await();

        /*--------------如何注冊(cè)、監(jiān)聽(tīng)事件機(jī)制-------------------------*/
        // 一、去創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)
        zooKeeper.create("/zk-test-wcl","1".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


        /**
         * 二、針對(duì)這個(gè)節(jié)點(diǎn) 綁定事件  3 種方式
         * (exists getdata getchildren)
         * 解釋:
         * zooKeeper.exists(String path,Watcher watcher)中的參數(shù)
         * watcher為true時(shí),代表默認(rèn)觸發(fā)全局 zookeeper 中的內(nèi)部類
         *
         * 演示:
         * 我們以 exists 來(lái) 綁定事件
         */
        Stat stat=zooKeeper.exists("/zk-test-wcl", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.getType()+"-->"+watchedEvent.getPath());
                // TODO:NOTE ==> 因?yàn)閣atcher是一次性操作,只能看到setData操作帶來(lái)的變化,delete操作看不到變化。
                // TODO:NOTE ==> 所以,要在綁定一次事件,來(lái)持續(xù)監(jiān)聽(tīng)

                try {
                    // TODO :注意true 默認(rèn)用的是全局的watcher
                    zooKeeper.exists(watchedEvent.getPath(),true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 通過(guò)修改的事務(wù)類型操作來(lái)觸發(fā)監(jiān)聽(tīng)事件
        // TODO:1、setData
       stat = zooKeeper.setData("/zk-test-wcl","2".getBytes(),
                stat.getVersion());
        Thread.sleep(1000);
        // TODO: 2、delete
        zooKeeper.delete("/zk-test-wcl",stat.getVersion());
        System.in.read();
    }


2、watcher事件類型
事件類型 含義
None(-1) 客戶端連接狀態(tài)發(fā)生改變時(shí),會(huì)受到none事件
NodeCreated(1) 創(chuàng)建節(jié)點(diǎn)事件
NodeDeleted(2) 刪除節(jié)點(diǎn)事件
NodeDataChanged(3) 節(jié)點(diǎn)數(shù)據(jù)發(fā)生變更
NodeChildrenChanged(4) 子節(jié)點(diǎn)被創(chuàng)建、被刪除會(huì)發(fā)生事件觸發(fā)
3、什么樣的操作會(huì)產(chǎn)生什么類型的事件

待續(xù)。。。。。

4、事件的實(shí)現(xiàn)原理

以上內(nèi)容是關(guān)于zookeeper 的zkCli客戶端java API的實(shí)操。其實(shí)zookeeper還有一個(gè)客戶端Curator。Curator是對(duì)zookeeper原生Java API的封裝。下面來(lái)記錄下,我貼下練習(xí)的代碼。

四、zookeeper客戶端:Curator 在java API中的實(shí)操

文件圖.png

前提:

在pom.xml文件中導(dǎo)入依賴的Curator包

    <!--開(kāi)源客戶端 Curator 依賴包 start-->
      <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-framework</artifactId>
          <version>4.0.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-recipes</artifactId>
          <version>4.0.0</version>
      </dependency>
      <!--開(kāi)源客戶端 Curator 依賴包  end -->
1、獲取,創(chuàng)建,修改信息

建立連接并獲取curator節(jié)點(diǎn),注意,這個(gè)節(jié)點(diǎn)必須是已經(jīng)存在的。

清單1、客戶端的鏈接及獲取命名空間
 String kvm = "192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
                connectString(kvm).sessionTimeoutMs(4000).
                retryPolicy(new ExponentialBackoffRetry(100, 3)).
                namespace("curator").build();
        curatorFramework.start();
清單2、創(chuàng)建一個(gè)多級(jí)節(jié)點(diǎn)

在原生Java API中創(chuàng)建節(jié)點(diǎn),必須逐層創(chuàng)建,即必須先存在父節(jié)點(diǎn),子節(jié)點(diǎn)才能創(chuàng)建。像上面中已經(jīng)介紹過(guò)的zookeeper zkCli客戶端。

現(xiàn)在在進(jìn)一步封裝的Curator客戶端,便可以直接創(chuàng)建一個(gè)多級(jí)節(jié)點(diǎn)。

curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
                forPath("/wcl/node1","1".getBytes());
清單3、獲取信息并修改信息
 Stat stat = new Stat();
        curatorFramework.getData().storingStatIn(stat).forPath("/wcl/node1");
        curatorFramework.setData().withVersion(stat.getVersion()).forPath("/wcl/node1", "xx".getBytes());

        

注意:在main()方法的最后 要關(guān)閉。添加這句代碼:curatorFramework.close();

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容