目錄
概述
基于watch實現(xiàn)監(jiān)聽
?DEMO
基于cache實現(xiàn)監(jiān)聽
?Path Cache介紹
?Node Cache介紹
?Tree Cache介紹
?DEMO
1概述
在筆記一中已經(jīng)對Curator與原生客戶端的監(jiān)聽方式進行了介紹,本文主要介紹Curator對zk監(jiān)聽的實現(xiàn)。在maven中引入recipes包,Curator封裝Zookeeper的典型場景使用都放在了recipes包中。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
2 基于watch實現(xiàn)監(jiān)聽
利用Watcher來對節(jié)點進行監(jiān)聽操作,但此監(jiān)聽操作只能監(jiān)聽一次,與原生API并無太大差異。如有典型業(yè)務場景需要使用可考慮,但一般情況不推薦使用。下面是具體的使用demo。
public class CuratorWatchEvent {
public static CuratorFramework build(){
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("172.30.241.205:2181")
.namespace(ZKConstant.ZK_NAMESPACE)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) throws Exception{
String path = "/watchtest";
CuratorFramework client = CuratorWatchEvent.build();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path);
build().getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("監(jiān)聽器watchedEvent:" + watchedEvent);
}
}).forPath(path);
client.setData().forPath(path,"new content".getBytes());
// 第二次變更節(jié)點數(shù)據(jù)
client.setData().forPath(path,"second content".getBytes());
client.close();
}
}
執(zhí)行結(jié)果:
監(jiān)聽器watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
基于cache實現(xiàn)監(jiān)聽
cache是一種緩存機制,Cache事件監(jiān)聽可以理解為一個本地緩存視圖與遠程Zookeeper視圖的對比過程。Cache提供了反復注冊的功能。
curator支持的cache種類有3種
- Path Cache
- Node Cache
- Tree Cache
Path Cache
Path Cache用來觀察ZNode的子節(jié)點并緩存狀態(tài),如果ZNode的子節(jié)點被創(chuàng)建,更新或者刪除,那么Path Cache會更新緩存,并且觸發(fā)事件給注冊的監(jiān)聽器。
Path Cache是通過PathChildrenCache類來實現(xiàn)的,監(jiān)聽器注冊是通過PathChildrenCacheListener。
Node Cache
Node Cache用來觀察ZNode自身,如果ZNode節(jié)點本身被創(chuàng)建,更新或者刪除,那么Node Cache會更新緩存,并觸發(fā)事件給注冊的監(jiān)聽器。
Node Cache是通過NodeCache類來實現(xiàn)的,監(jiān)聽器對應的接口為NodeCacheListener。
Tree Cache
可以看做是上兩種的合體,Tree Cache觀察的是ZNode及子節(jié)點。
DEMO
public class CacheListenerDemo {
public static CuratorFramework build(){
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("172.30.241.205:2181")
.namespace(ZKConstant.ZK_NAMESPACE)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) throws Exception{
try {
String testPath="pathChildrenCacheTest";
//創(chuàng)建連接
CuratorFramework client= CacheListenerDemo.build();
//如果testPath存在,刪除路徑
Stat stat = client.checkExists().forPath("/"+testPath);
if(stat != null)
{
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath);
}
//創(chuàng)建testPath
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath,testPath.getBytes());
//創(chuàng)建PathChildrenCache
//參數(shù):true代表緩存數(shù)據(jù)到本地
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/" + testPath,true);
//BUILD_INITIAL_CACHE 代表使用同步的方式進行緩存初始化。
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
pathChildrenCache.getListenable().addListener((cf, event) -> {
PathChildrenCacheEvent.Type eventType = event.getType();
switch (eventType) {
case CONNECTION_RECONNECTED:
pathChildrenCache.rebuild();
break;
case CONNECTION_SUSPENDED:
break;
case CONNECTION_LOST:
System.out.println("Connection lost");
break;
case CHILD_ADDED:
System.out.println("Child added");
break;
case CHILD_UPDATED:
System.out.println("Child updated");
break;
case CHILD_REMOVED:
System.out.println("Child removed");
break;
default:
}
});
//創(chuàng)建子節(jié)點1
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/1",testPath.getBytes());
Thread.sleep(1000);
//創(chuàng)建子節(jié)點1
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/2",testPath.getBytes());
Thread.sleep(1000);
//刪除子節(jié)點1
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/1");
Thread.sleep(1000);
//刪除子節(jié)點2
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/2");
Thread.sleep(1000);
pathChildrenCache.close();
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}