前提
建立maven項(xiàng)目中 要導(dǎo)入zookeeper的依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
我們同時(shí)可以打開(kāi)linux中的zookeeper客戶端來(lái)驗(yàn)證對(duì)比。輸入 zkCli.sh 便可以進(jìn)入zookeeper客戶端 。
一、建立連接
直接建立連接后,不進(jìn)行等待判斷 運(yùn)行結(jié)果為連接中(CONNECTING)。
清單1 連接中 zookeeper
// 一、沒(méi)有連接成功
public static void main(String[] args) {
// zookeeper集群 ip :客戶端口
String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
try {
ZooKeeper zooKeeper=new ZooKeeper(kvm,4000,null);
// CONNECTING 連接中 ==》根據(jù)zookeeper的四個(gè)狀態(tài),可知、;沒(méi)有連接成功
System.out.println(zooKeeper.getState());
zooKeeper.close(); // 關(guān)閉連接
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
添加CountDownLatch同步工具類.
清單2 連接zookeeper成功
// 二、連接成功 zookeeper
/**
* 同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程的操作執(zhí)行完后再執(zhí)行。
*/
public static void main(String[] args) {
final CountDownLatch countDownLatch=new CountDownLatch(1);
String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
try {
ZooKeeper zooKeeper=new ZooKeeper(kvm, 4000, new Watcher() {
// process: 觀察者隊(duì)列
@Override
public void process(WatchedEvent watchedEvent) {
// SyncConnected :同步連接
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
// 如果收到服務(wù)端的響應(yīng)時(shí)間,連接成功
countDownLatch.countDown();
System.out.println("建立連接成功");
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState()); // CONNECTED :連接成功
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
二、數(shù)據(jù)的增刪改查操作
基于代碼的增刪改查操作,我們可以去到zookeeper中的zkCli客戶端窗口中去驗(yàn)證。
進(jìn)入zkCli客戶端窗口:在bin目錄下 直接輸入 zkCli.sh 回車,便可進(jìn)入。
清單1 事務(wù)操作
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch=new CountDownLatch(1);
java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";
java.lang.String kvm2="192.168.0.85:2181";
ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected==watchedEvent.getState()){
countDownLatch.countDown();//如果收到了服務(wù)端的響應(yīng)時(shí)間,連接成功
System.out.println("zk 建立連接");
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());// connected 成功
/**
* zookeeper數(shù)據(jù)的增刪改查
*/
// 1.新增節(jié)點(diǎn)
zooKeeper.create("/zk-wcl","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
Stat stat=new Stat();// 狀態(tài)
// 2.查看新增后的節(jié)點(diǎn)狀態(tài)
byte[] bytes=zooKeeper.getData("/zk-wcl",null,stat);
System.out.println(new String(bytes));
// 3.修改節(jié)點(diǎn)
zooKeeper.setData("/zk-wcl","1".getBytes(),stat.getVersion());
// 4.查看修改后的節(jié)點(diǎn)狀態(tài)
byte[] byte2=zooKeeper.getData("/zk-wcl",null,stat);
System.out.println(new String(byte2));
// 5.刪除節(jié)點(diǎn)
zooKeeper.delete("/zk-wcl",stat.getVersion());
zooKeeper.close();
System.in.read();// 當(dāng)前進(jìn)程阻塞
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
三、事件機(jī)制
介紹
Watcher監(jiān)聽(tīng)機(jī)制是ZooKeeper中非常重要的特性,我們基于zookeeper上創(chuàng)建的節(jié)點(diǎn),可以對(duì)這些節(jié)點(diǎn)綁定監(jiān)聽(tīng)事件,比如可以監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)變更、節(jié)點(diǎn)刪除、子節(jié)點(diǎn)狀態(tài)變更等事件。通過(guò)這個(gè)事件機(jī)制,可以基于zookeeper實(shí)現(xiàn)分布式鎖、集群管理功能。
Watcher特性:當(dāng)數(shù)據(jù)發(fā)生變化的時(shí)候,zookeeper會(huì)產(chǎn)生一個(gè)Watcher事件,并且會(huì)發(fā)送到客戶端,但是客戶端只會(huì)接受一次通知。如果后續(xù)這個(gè)節(jié)點(diǎn)再次發(fā)生變化,那么之前的設(shè)置watcher的客戶端不會(huì)再次受到消息。
watcher是一次性操作??梢酝ㄟ^(guò)循環(huán)監(jiān)聽(tīng)去達(dá)到永久監(jiān)聽(tīng)效果。
1、如何注冊(cè)事件機(jī)制
分為兩步:綁定事件、觸發(fā)事件。
第一步:通過(guò)getDate、exists 、getChildren這三個(gè)操作來(lái)綁定事件。
第二步:凡是事務(wù)類型的操作,都會(huì)觸發(fā)監(jiān)聽(tīng)事件。
事務(wù)操作:create 、``delete````、 setData
清單1:創(chuàng)建刪除節(jié)點(diǎn)的事件注冊(cè)監(jiān)聽(tīng)實(shí)例
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*------------------基于Java API 建立連接---------------------------*/
final CountDownLatch countDownLatch=new CountDownLatch(1);
java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";
java.lang.String kvm2="192.168.0.85:2181";
// TODO: 這兒是全局的Watcher
ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 默認(rèn)事件
System.out.println("默認(rèn)事件:"+watchedEvent.getType());
if (Event.KeeperState.SyncConnected==watchedEvent.getState()){
countDownLatch.countDown();//如果收到了服務(wù)端的響應(yīng)時(shí)間,連接成功
System.out.println("zk 建立連接");
}
}
});
countDownLatch.await();
/*--------------如何注冊(cè)、監(jiān)聽(tīng)事件機(jī)制-------------------------*/
// 一、去創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)
zooKeeper.create("/zk-test-wcl","1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
/**
* 二、針對(duì)這個(gè)節(jié)點(diǎn) 綁定事件 3 種方式
* (exists getdata getchildren)
* 解釋:
* zooKeeper.exists(String path,Watcher watcher)中的參數(shù)
* watcher為true時(shí),代表默認(rèn)觸發(fā)全局 zookeeper 中的內(nèi)部類
*
* 演示:
* 我們以 exists 來(lái) 綁定事件
*/
Stat stat=zooKeeper.exists("/zk-test-wcl", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType()+"-->"+watchedEvent.getPath());
// TODO:NOTE ==> 因?yàn)閣atcher是一次性操作,只能看到setData操作帶來(lái)的變化,delete操作看不到變化。
// TODO:NOTE ==> 所以,要在綁定一次事件,來(lái)持續(xù)監(jiān)聽(tīng)
try {
// TODO :注意true 默認(rèn)用的是全局的watcher
zooKeeper.exists(watchedEvent.getPath(),true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 通過(guò)修改的事務(wù)類型操作來(lái)觸發(fā)監(jiān)聽(tīng)事件
// TODO:1、setData
stat = zooKeeper.setData("/zk-test-wcl","2".getBytes(),
stat.getVersion());
Thread.sleep(1000);
// TODO: 2、delete
zooKeeper.delete("/zk-test-wcl",stat.getVersion());
System.in.read();
}
2、watcher事件類型
| 事件類型 | 含義 |
|---|---|
| None(-1) | 客戶端連接狀態(tài)發(fā)生改變時(shí),會(huì)受到none事件 |
| NodeCreated(1) | 創(chuàng)建節(jié)點(diǎn)事件 |
| NodeDeleted(2) | 刪除節(jié)點(diǎn)事件 |
| NodeDataChanged(3) | 節(jié)點(diǎn)數(shù)據(jù)發(fā)生變更 |
| NodeChildrenChanged(4) | 子節(jié)點(diǎn)被創(chuàng)建、被刪除會(huì)發(fā)生事件觸發(fā) |
3、什么樣的操作會(huì)產(chǎn)生什么類型的事件
待續(xù)。。。。。
4、事件的實(shí)現(xiàn)原理
以上內(nèi)容是關(guān)于zookeeper 的zkCli客戶端java API的實(shí)操。其實(shí)zookeeper還有一個(gè)客戶端Curator。Curator是對(duì)zookeeper原生Java API的封裝。下面來(lái)記錄下,我貼下練習(xí)的代碼。
四、zookeeper客戶端:Curator 在java API中的實(shí)操

前提:
在pom.xml文件中導(dǎo)入依賴的Curator包
<!--開(kāi)源客戶端 Curator 依賴包 start--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <!--開(kāi)源客戶端 Curator 依賴包 end -->
1、獲取,創(chuàng)建,修改信息
建立連接并獲取curator節(jié)點(diǎn),注意,這個(gè)節(jié)點(diǎn)必須是已經(jīng)存在的。
清單1、客戶端的鏈接及獲取命名空間
String kvm = "192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString(kvm).sessionTimeoutMs(4000).
retryPolicy(new ExponentialBackoffRetry(100, 3)).
namespace("curator").build();
curatorFramework.start();
清單2、創(chuàng)建一個(gè)多級(jí)節(jié)點(diǎn)
在原生Java API中創(chuàng)建節(jié)點(diǎn),必須逐層創(chuàng)建,即必須先存在父節(jié)點(diǎn),子節(jié)點(diǎn)才能創(chuàng)建。像上面中已經(jīng)介紹過(guò)的zookeeper zkCli客戶端。
現(xiàn)在在進(jìn)一步封裝的Curator客戶端,便可以直接創(chuàng)建一個(gè)多級(jí)節(jié)點(diǎn)。
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
forPath("/wcl/node1","1".getBytes());
清單3、獲取信息并修改信息
Stat stat = new Stat();
curatorFramework.getData().storingStatIn(stat).forPath("/wcl/node1");
curatorFramework.setData().withVersion(stat.getVersion()).forPath("/wcl/node1", "xx".getBytes());
注意:在main()方法的最后 要關(guān)閉。添加這句代碼:curatorFramework.close();