Curator之監(jiān)聽

目錄
概述
基于watch實現(xiàn)監(jiān)聽
?DEMO
基于cache實現(xiàn)監(jiān)聽
?Path Cache介紹
?Node Cache介紹
?Tree Cache介紹
?DEMO


1概述

在筆記一中已經(jīng)對Curator與原生客戶端的監(jiān)聽方式進行了介紹,本文主要介紹Curator對zk監(jiān)聽的實現(xiàn)。在maven中引入recipes包,Curator封裝Zookeeper的典型場景使用都放在了recipes包中。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
2 基于watch實現(xiàn)監(jiān)聽

利用Watcher來對節(jié)點進行監(jiān)聽操作,但此監(jiān)聽操作只能監(jiān)聽一次,與原生API并無太大差異。如有典型業(yè)務場景需要使用可考慮,但一般情況不推薦使用。下面是具體的使用demo。

public class CuratorWatchEvent {
    public static CuratorFramework build(){
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString("172.30.241.205:2181")
                .namespace(ZKConstant.ZK_NAMESPACE)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();
        return curatorFramework;
    }

    public static void main(String[] args) throws Exception{
        String path = "/watchtest";
        CuratorFramework client  = CuratorWatchEvent.build();
        client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path);
        build().getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("監(jiān)聽器watchedEvent:" + watchedEvent);
            }
        }).forPath(path);

        client.setData().forPath(path,"new content".getBytes());

        // 第二次變更節(jié)點數(shù)據(jù)
        client.setData().forPath(path,"second content".getBytes());
        client.close();
    }
}

執(zhí)行結(jié)果:

監(jiān)聽器watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
基于cache實現(xiàn)監(jiān)聽

cache是一種緩存機制,Cache事件監(jiān)聽可以理解為一個本地緩存視圖與遠程Zookeeper視圖的對比過程。Cache提供了反復注冊的功能。
curator支持的cache種類有3種

  • Path Cache
  • Node Cache
  • Tree Cache
Path Cache

Path Cache用來觀察ZNode的子節(jié)點并緩存狀態(tài),如果ZNode的子節(jié)點被創(chuàng)建,更新或者刪除,那么Path Cache會更新緩存,并且觸發(fā)事件給注冊的監(jiān)聽器。

Path Cache是通過PathChildrenCache類來實現(xiàn)的,監(jiān)聽器注冊是通過PathChildrenCacheListener。

Node Cache

Node Cache用來觀察ZNode自身,如果ZNode節(jié)點本身被創(chuàng)建,更新或者刪除,那么Node Cache會更新緩存,并觸發(fā)事件給注冊的監(jiān)聽器。

Node Cache是通過NodeCache類來實現(xiàn)的,監(jiān)聽器對應的接口為NodeCacheListener。

Tree Cache

可以看做是上兩種的合體,Tree Cache觀察的是ZNode及子節(jié)點。

DEMO
public class CacheListenerDemo {
    public static CuratorFramework build(){
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString("172.30.241.205:2181")
                .namespace(ZKConstant.ZK_NAMESPACE)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();
        return curatorFramework;
    }

    public static void main(String[] args) throws Exception{
        try {
            String testPath="pathChildrenCacheTest";
            //創(chuàng)建連接
            CuratorFramework client= CacheListenerDemo.build();
            //如果testPath存在,刪除路徑
            Stat stat = client.checkExists().forPath("/"+testPath);
            if(stat != null)
            {
                client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath);
            }
            //創(chuàng)建testPath
            client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath,testPath.getBytes());

            //創(chuàng)建PathChildrenCache
            //參數(shù):true代表緩存數(shù)據(jù)到本地
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/" + testPath,true);
            //BUILD_INITIAL_CACHE 代表使用同步的方式進行緩存初始化。
            pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            pathChildrenCache.getListenable().addListener((cf, event) -> {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        pathChildrenCache.rebuild();
                        break;
                    case CONNECTION_SUSPENDED:
                        break;
                    case CONNECTION_LOST:
                        System.out.println("Connection lost");
                        break;
                    case CHILD_ADDED:
                        System.out.println("Child added");
                        break;
                    case CHILD_UPDATED:
                        System.out.println("Child updated");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("Child removed");
                        break;
                    default:
                }
            });

            //創(chuàng)建子節(jié)點1
            client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/1",testPath.getBytes());
            Thread.sleep(1000);
            //創(chuàng)建子節(jié)點1
            client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/2",testPath.getBytes());
            Thread.sleep(1000);
            //刪除子節(jié)點1
            client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/1");
            Thread.sleep(1000);
            //刪除子節(jié)點2
            client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/2");
            Thread.sleep(1000);

            pathChildrenCache.close();
        } catch (Exception e) {
            e.printStackTrace();
            // TODO: handle exception
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容