Zookeeper 客戶端 Curator
概述
Curator是Netflix公司開源的一套Zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。
Curator包含了幾個包:
-
curator-framework:對
Zookeeper的底層API的一些封裝。 - curator-client:提供一些客戶端的操作,例如重試策略等。
- curator-recipes:封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計數(shù)器、分布式Barrier等。
客戶端
創(chuàng)建
靜態(tài)方法創(chuàng)建
// 重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
connectionInfo,
5000,
3000,
retryPolicy);
Fluent API 創(chuàng)建
// 重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
創(chuàng)建包含命名空間的客戶端
// 重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base") // 命名空間,即該客戶端可操作目錄的根目錄
.build();
啟動
client.start();
節(jié)點
Zookeeper 節(jié)點類型
- PERSISTENT: 持久化
- PERSISTENT_SEQUENTIAL: 持久化并且?guī)蛄刑?/li>
- EPHEMERAL: 臨時
- EPHEMERAL_SEQUENTIAL: 臨時并且?guī)蛄刑?/li>
創(chuàng)建
// 創(chuàng)建節(jié)點,內(nèi)容為空
client.create().forPath("path");
// 創(chuàng)建節(jié)點,內(nèi)容為“init”
client.create().forPath("path","init".getBytes());
// 創(chuàng)建臨時節(jié)點,內(nèi)容為空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
// 創(chuàng)建臨時節(jié)點,內(nèi)容為“init”
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
// 創(chuàng)建臨時節(jié)點,內(nèi)容為“init”,遞歸創(chuàng)建父節(jié)點
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
刪除
// 刪除節(jié)點
client.delete().forPath("path");
// 刪除節(jié)點,遞歸刪除子節(jié)點
client.delete().deletingChildrenIfNeeded().forPath("path");
// 刪除制定版本節(jié)點
client.delete().withVersion(10086).forPath("path");
// 強制刪除節(jié)點
client.delete().guaranteed().forPath("path");
讀取
// 讀取節(jié)點內(nèi)容
client.getData().forPath("path");
// 讀取節(jié)點內(nèi)容及狀態(tài)
client.getData().storingStatIn(stat).forPath("path");
更新
// 更新節(jié)點
client.setData().forPath("path","data".getBytes());
// 指定版本更新節(jié)點
client.setData().withVersion(10086).forPath("path","data".getBytes());
節(jié)點是否存在
client.checkExists().forPath("path");
獲取所有子節(jié)點
client.getChildren().forPath("path");
事務(wù)
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
異步接口
Curator提供異步接口,引入了BackgroundCallback接口用于處理異步接口調(diào)用之后服務(wù)端返回的結(jié)果信息。BackgroundCallback接口中一個重要的回調(diào)值為CuratorEvent,里面包含事件類型、響應(yīng)嗎和節(jié)點的詳細(xì)信息。
CuratorEventType
| 事件類型 | 對應(yīng)CuratorFramework實例的方法 |
|---|---|
| CREATE | create() |
| DELETE | delete() |
| EXISTS | checkExists() |
| GET_DATA | getData() |
| SET_DATA | setData() |
| CHILDREN | getChildren() |
| SYNC | sync(String,Object) |
| GET_ACL | getACL() |
| SET_ACL | setACL() |
| WATCHED | Watcher(Watcher) |
| CLOSING | close() |
響應(yīng)碼
| 響應(yīng)碼 | 意義 |
|---|---|
| 0 | OK,即調(diào)用成功 |
| -4 | ConnectionLoss,即客戶端與服務(wù)端斷開連接 |
| -110 | NodeExists,即節(jié)點已經(jīng)存在 |
| -112 | SessionExpired,即會話過期 |
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor) //如果inBackground()方法不指定executor,那么會默認(rèn)使用Curator的EventThread去進(jìn)行異步處理
.forPath("path");
緩存
強烈推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),當(dāng)連接狀態(tài)為LOST,curator-recipes下的所有Api將會失效或者過期,盡管后面所有的例子都沒有使用到ConnectionStateListener。
Zookeeper原生支持通過注冊Watcher來進(jìn)行事件監(jiān)聽,但是開發(fā)者需要反復(fù)注冊(Watcher只能單次注冊單次使用)。Cache是Curator中對事件監(jiān)聽的包裝,可以看作是對事件監(jiān)聽的本地緩存視圖,能夠自動為開發(fā)者處理反復(fù)注冊監(jiān)聽。Curator提供了三種Watcher(Cache)來監(jiān)聽結(jié)點的變化。
Path Cache
Path Cache用來監(jiān)控一個ZNode的子節(jié)點。當(dāng)一個子節(jié)點增加,更新,刪除時,Path Cache會改變它的狀態(tài), 會包含最新的子節(jié)點,子節(jié)點的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過PathChildrenCacheListener通知。
實際使用時會涉及到四個類:
- PathChildrenCache
- PathChildrenCacheEvent
- PathChildrenCacheListener
- ChildData
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
想使用cache,必須調(diào)用它的start方法,使用完后調(diào)用close方法。 可以設(shè)置StartMode來實現(xiàn)啟動的模式。
StartMode有下面幾種:
- NORMAL:正常初始化。
- BUILD_INITIAL_CACHE:在調(diào)用
start()之前會調(diào)用rebuild()。 - POST_INITIALIZED_EVENT: 當(dāng)Cache初始化數(shù)據(jù)后發(fā)送一個PathChildrenCacheEvent.Type#INITIALIZED事件
public void addListener(PathChildrenCacheListener listener) 可以增加listener監(jiān)聽緩存的變化。
getCurrentData()方法返回一個List<ChildData>對象,可以遍歷所有的子節(jié)點。
設(shè)置/更新、移除其實是使用client (CuratorFramework)來操作, 不通過PathChildrenCache操作:
public class PathCacheDemo {
private static final String PATH = "/example/pathCache";
public static void main(String[] args) throws Exception {
// 創(chuàng)建客戶端
String connectInfo = "localhost:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
// 啟動
client.start();
// cache
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
// cache start
cache.start();
// cache listener
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件類型:" + event.getType());
if (null != event.getData()) {
System.out.println("節(jié)點數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
// test
client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());
ThreadUtil.sleep(10);
client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());
ThreadUtil.sleep(10);
client.setData().forPath(PATH + "/test01", "01_V2".getBytes());
ThreadUtil.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath(PATH + "/test01");
ThreadUtil.sleep(10);
client.delete().forPath(PATH + "/test02");
ThreadUtil.sleep(10);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:如果new PathChildrenCache(client, PATH, true)中的參數(shù)cacheData值設(shè)置為false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會緩存節(jié)點數(shù)據(jù)。
注意:示例中的Thread.sleep(10)可以注釋掉,但是注釋后事件監(jiān)聽的觸發(fā)次數(shù)會不全,這可能與PathCache的實現(xiàn)原理有關(guān),不能太過頻繁的觸發(fā)事件!
Node Cache
Node Cache與Path Cache類似,Node Cache只是監(jiān)聽某一個特定的節(jié)點。它涉及到下面的三個類:
-
NodeCache- Node Cache實現(xiàn)類 -
NodeCacheListener- 節(jié)點監(jiān)聽器 -
ChildData- 節(jié)點數(shù)據(jù)
注意:使用cache,依然要調(diào)用它的start()方法,使用完后調(diào)用close()方法。
getCurrentData()將得到節(jié)點當(dāng)前的狀態(tài),通過它的狀態(tài)可以得到當(dāng)前的值。
public class NodeCacheDemo {
private static final String PATH = "/example/nodeCache";
public static void main(String[] args) throws Exception {
// 創(chuàng)建客戶端
String connectInfo = "localhost:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
// 啟動
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
// cache
NodeCache cache = new NodeCache(client, PATH);
// cache start
cache.start();
// cache listener
NodeCacheListener cacheListener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("節(jié)點數(shù)據(jù):" + new String(cache.getCurrentData().getData()));
} else {
System.out.println("節(jié)點被刪除!");
}
};
cache.getListenable().addListener(cacheListener);
// test
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(10);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(10);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(10);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:示例中的Thread.sleep(10)可以注釋,但是注釋后事件監(jiān)聽的觸發(fā)次數(shù)會不全,這可能與NodeCache的實現(xiàn)原理有關(guān),不能太過頻繁的觸發(fā)事件!
注意:NodeCache只能監(jiān)聽一個節(jié)點的狀態(tài)變化。
Tree Cache
Tree Cache可以監(jiān)控整個樹上的所有節(jié)點,類似于PathCache和NodeCache的組合,主要涉及到下面四個類:
- TreeCache - Tree Cache實現(xiàn)類
- TreeCacheListener - 監(jiān)聽器類
- TreeCacheEvent - 觸發(fā)的事件類
- ChildData - 節(jié)點數(shù)據(jù)
public class TreeCacheDemo {
private static final String PATH = "/example/cache";
public static void main(String[] args) throws Exception {
// 創(chuàng)建客戶端
String connectInfo = "localhost:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
// 啟動
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH);
// cache
TreeCache cache = new TreeCache(client, PATH);
// cache start
cache.start();
// cache listener
TreeCacheListener cacheListener = (client1, event) -> System.out.println("事件類型:" + event.getType() +
" | 路徑:" + (null != event.getData() ? event.getData().getPath() : null) +
" | 數(shù)據(jù):" + (null != event.getData() ? new String(event.getData().getData()) : null));
cache.getListenable().addListener(cacheListener);
// test
client.setData().forPath(PATH, "01".getBytes());
// Thread.sleep(10);
client.setData().forPath(PATH, "02".getBytes());
// Thread.sleep(10);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
// Thread.sleep(10);
cache.close();
client.close();
System.out.println("OK!");
}
}
注意:在此示例中沒有使用Thread.sleep(10),但是事件觸發(fā)次數(shù)也是正常的。
注意:TreeCache在初始化(調(diào)用start()方法)的時候會回調(diào)TreeCacheListener實例一個事TreeCacheEvent,而回調(diào)的TreeCacheEvent對象的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導(dǎo)致空指針異常,這里應(yīng)該主動處理并避免這種情況。