ZK應(yīng)用場景
監(jiān)聽+回調(diào)—NodeCache
// 節(jié)點監(jiān)控測試
public class NodeWatcherTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("watcher")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
String path = "/nodecache";
zkFluentClient.start();
// 創(chuàng)建節(jié)點
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
// 創(chuàng)建監(jiān)控watcher
final NodeCache nodeCache = new NodeCache(zkFluentClient, path);
nodeCache.start();// 開啟監(jiān)控
// 注冊對應(yīng)的listener
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
// 這里只是打印出來
System.out.println("Node Data updated,new Data:" + new String(nodeCache.getCurrentData().getData()));
}
});
// 做個實際的更新處理--set
zkFluentClient.setData().forPath(path,"update my data".getBytes());
TimeUnit.SECONDS.sleep(10);
// 清除對應(yīng)的數(shù)據(jù)
zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
TimeUnit.MICROSECONDS.sleep(Integer.MAX_VALUE);
}
}
這里關(guān)鍵是NodeCache,理解這個東西。
new NodeCache第三個參數(shù),默認(rèn)為false。如果為true,那么NodeCache在第一次啟動的時候就會立刻從zk上讀取對應(yīng)節(jié)點的數(shù)據(jù)內(nèi)容,并保存在Cache中。
NodeCache不僅可以用于監(jiān)聽數(shù)據(jù)節(jié)點的內(nèi)容變更,也可以監(jiān)聽指定節(jié)點是否存在。如果原節(jié)點不存在,那么Cache就會在節(jié)點被創(chuàng)建后觸發(fā)listener。但是如果數(shù)據(jù)節(jié)點被刪除,就無法觸發(fā)listener。
子節(jié)點監(jiān)聽--PathChildrenCache
注意,無法監(jiān)聽二級子節(jié)點。
子節(jié)點變化有三種事件:CHILD_ADD\CHILD_UPDATED\CHILD_REMOVED
import java.util.concurrent.TimeUnit;
public class PathChildrenCacheTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("child-watcher")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
zkFluentClient.start();
final String path = "/child3";
// 指定需要監(jiān)控的父目錄
final PathChildrenCache pathChildrenCache = new PathChildrenCache(zkFluentClient, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
// 注冊listener
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework,
PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED" + pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED" + pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED" + pathChildrenCacheEvent.getData().getPath());
break;
default:
break;
}
}
});
// 創(chuàng)建父節(jié)點,如果存在就跳過
//zkFluentClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
// TimeUnit.SECONDS.sleep(1);
zkFluentClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
zkFluentClient.setData().forPath(path + "/c1","update data".getBytes()); // update 沒看到執(zhí)行回調(diào),但是數(shù)據(jù)內(nèi)容已經(jīng)更新了
zkFluentClient.delete().guaranteed().forPath(path + "/c1");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
這里會關(guān)注子節(jié)點的變化,但是update內(nèi)容貌似并沒有觸發(fā)listener的執(zhí)行,好神奇。