1 zk 簡介
zk 管理大數(shù)據(jù)生態(tài)系統(tǒng)中各個組件。(Hadoop、Hive、Spark)

zk應(yīng)用場景:
zk是一個經(jīng)典的分布式數(shù)據(jù)一致性解決方案。致力于為分布式應(yīng)用提供一個高性能、高可用,且具有嚴(yán)格順序訪問控制能力的分布式協(xié)調(diào)存儲服務(wù)。
- 維護(hù)配置信息
- 分布式鎖服務(wù)
- 集群管理
- 生成分布式唯一ID
維護(hù)配置信息:
在分布式集群上,需要保證每臺機(jī)器的配置信息一致。例如數(shù)據(jù)庫的url,用戶名密碼等。zk提供高可用、高性能的配置服務(wù),能夠快速高效的完成集群上配置項的更改,并且能夠保證集群上機(jī)器配置數(shù)據(jù)的一致性。
zk 提供配置服務(wù),使用Zab這種一致性協(xié)議來保證一致性。
例如在hbase中,客戶端先連接zk,獲取hbase集群的配置信息,然后才能操作。開源的消息隊列kafka中,也是使用zk來維護(hù)broker信息。在dubbo中也廣泛使用了zk來管理一些配置。

分布式鎖服務(wù):
在分布式系統(tǒng)中,多臺服務(wù)器運(yùn)行著相同的服務(wù)。當(dāng)多個服務(wù)器在運(yùn)行時就需要協(xié)調(diào)各個服務(wù)的進(jìn)度。zk提出了臨時有序節(jié)點(diǎn)的概念,通過加鎖,保證當(dāng)某個服務(wù)在調(diào)用時,其他服務(wù)不能進(jìn)行該操作。如果機(jī)器掛掉,釋放鎖并fail over到其他機(jī)器繼續(xù)執(zhí)行該服務(wù)。

集群管理:
集群中有時因為各種軟硬件故障或者網(wǎng)絡(luò)故障,出現(xiàn)服務(wù)器掛掉而被移除出集群或者服務(wù)加入集群。zk提供了watch機(jī)制,能夠?qū)⒎?wù)的移除/加入的情況通知給集群中其他正常工作的機(jī)器,以及時調(diào)整存儲和計算等任務(wù)的分配和執(zhí)行。zk還會對故障的機(jī)器做出診斷并嘗試修復(fù)。

生成分布式唯一ID:
每次生成一個新ID時,zk會創(chuàng)建一個持久順序節(jié)點(diǎn),創(chuàng)建操作返回的節(jié)點(diǎn)序號,即為新ID,然后把比自己小的ID刪掉。
zk的設(shè)計目標(biāo):
致力于為分布式應(yīng)用提供一個高性能、高可用、具有嚴(yán)格順序訪問控制能力的分布式協(xié)調(diào)服務(wù)。
- 高性能
zk將全量數(shù)據(jù)存儲在內(nèi)存中,并直接服務(wù)于客戶端的所有非事務(wù)請求。尤其適用于以讀為主的場景。
- 高可用
zk以集群的方式對外提供服務(wù)。每臺機(jī)器都會在內(nèi)存中維護(hù)當(dāng)前的服務(wù)器狀態(tài),并且各個機(jī)器之間互相保持通信。只要集群中超過一般的機(jī)器都能正常工作,那么集群就能正常對外提供服務(wù)。
- 嚴(yán)格順序訪問
對于來自客戶端的每個請求,zk都會分配一個全局唯一的遞增編號,這個編號反應(yīng)了所有事務(wù)操作的先后順序。
2 zk數(shù)據(jù)模型
zk進(jìn)行數(shù)據(jù)存儲時的數(shù)據(jù)模型
zk的數(shù)據(jù)節(jié)點(diǎn)是樹狀結(jié)構(gòu)(類似linux文件目錄結(jié)構(gòu))。
樹中的每個節(jié)點(diǎn)稱為znode(zookeeper node),一個znode可用由多個子節(jié)點(diǎn)。zk的數(shù)據(jù)節(jié)點(diǎn)在結(jié)構(gòu)上表現(xiàn)為樹狀結(jié)構(gòu)。使用path來定位到某個znode。比如/wh/node1/node11/node112,wh是根節(jié)點(diǎn)。
znode兼具文件和目錄兩種特點(diǎn)。
節(jié)點(diǎn)既像文件一樣維護(hù)了數(shù)據(jù)、元信息、ACL、時間戳等數(shù)據(jù)結(jié)構(gòu),又像目錄一樣構(gòu)成了樹結(jié)構(gòu)。作為path的一部分。

znode大體上分為三部分:
- 節(jié)點(diǎn)的數(shù)據(jù):即znode data(節(jié)點(diǎn)path,節(jié)點(diǎn)data的關(guān)系就像map的key-value一樣)。
- 節(jié)點(diǎn)的子節(jié)點(diǎn)children
- 節(jié)點(diǎn)的狀態(tài)stat:用來描述當(dāng)前節(jié)點(diǎn)的創(chuàng)建、修改記錄,包括cZxid、ctime等。
節(jié)點(diǎn)狀態(tài)stat的屬性:
在zk shell中,通過get命令查看指定路徑節(jié)點(diǎn)的data、stat信息:

屬性說明:
cZxid : 數(shù)據(jù)節(jié)點(diǎn)創(chuàng)建時的事務(wù)ID
ctime :數(shù)據(jù)節(jié)點(diǎn)創(chuàng)建時的時間
mZxid : 數(shù)據(jù)節(jié)點(diǎn)最后一次更新時的事務(wù)ID
mtime : 數(shù)據(jù)節(jié)點(diǎn)最后一次更新時的時間
pZxid : 數(shù)據(jù)節(jié)點(diǎn)的子節(jié)點(diǎn)最后一次被修改時的事務(wù)ID
cversion : 子節(jié)點(diǎn)的更改次數(shù)
dataVersion : 節(jié)點(diǎn)數(shù)據(jù)的更改次數(shù)
aclVersion : 節(jié)點(diǎn)的ACL的更改次數(shù)
ephemeralOwner : 如果節(jié)點(diǎn)是臨時節(jié)點(diǎn),則表示創(chuàng)建該節(jié)點(diǎn)的會話的sessionID;如果節(jié)點(diǎn)是持久節(jié)點(diǎn),則該屬性值為0
dataLength : 數(shù)據(jù)內(nèi)存長度
numChildren : 數(shù)據(jù)節(jié)點(diǎn)當(dāng)前子節(jié)點(diǎn)的數(shù)量
節(jié)點(diǎn)類型:
分為兩種:臨時節(jié)點(diǎn)和持久化節(jié)點(diǎn)。節(jié)點(diǎn)的類型在創(chuàng)建時即被確定,并且不能改變。
臨時節(jié)點(diǎn):該節(jié)點(diǎn)的生命周期依賴于創(chuàng)建它們的會話。一旦會話(session)結(jié)束,臨時節(jié)點(diǎn)就會被自動刪除。當(dāng)然也可用手動刪除。雖然每個臨時節(jié)點(diǎn)都會被綁定到一個客戶端會話,但他們對所有的客戶端都是可見的。臨時節(jié)點(diǎn)不允許擁有子節(jié)點(diǎn)。
持久化節(jié)點(diǎn):生命周期不依賴會話。并且只有在客戶端執(zhí)行刪除操作的時候,才能被刪除。
一般使用持久化有序節(jié)點(diǎn)來創(chuàng)建分布式唯一ID。
一般使用臨時有序節(jié)點(diǎn)來創(chuàng)建分布式鎖。
3 zk linux單機(jī)安裝
zk依賴jdk




客戶端登陸工具 ./zkCli.sh
./zkCli.sh -server ip
4 zk常用shell命令




無法直接delete含有子節(jié)點(diǎn)的節(jié)點(diǎn),要有rmr命令。




ls2 = ls + stat

一個監(jiān)聽器的注冊只能捕獲一次事件

5 針對zk 數(shù)據(jù)節(jié)點(diǎn)的權(quán)限控制
Acl權(quán)限控制, Access control list






多個ip授權(quán)
setAcl /node2 ip:192.168.1.1:cdrwa,ip:192.168.1.2:cdr





6 zk JavaAPI操作zk數(shù)據(jù)
通過Java API去操作zk中存儲的一系列數(shù)據(jù)。

package connection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import javax.swing.plaf.IconUIResource;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkConnection {
public static void main(String[] args) {
try {
// 計數(shù)器
// zk創(chuàng)建是異步的
final CountDownLatch latch = new CountDownLatch(1);
// arg1 zk server ip port
// arg2 client timeout mills
// arg3 watcher
ZooKeeper zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
// 阻塞直到連接創(chuàng)建成功
latch.await();
// 打印會話編號
System.out.println(zk.getSessionId());
System.out.println("all done...");
} catch (Exception e) {
e.printStackTrace();
}
}
}

package create;
import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkCreate1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void testCreate1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : node data
// arg3 : 權(quán)限列表 world:anyone:cdrwa
// arg4 : 節(jié)點(diǎn)類型 持久化節(jié)點(diǎn)
zk.create("/wh/node2", "node2".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
package create;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkCreate1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void testCreate1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : node data
// arg3 : 權(quán)限列表 world:anyone:cdrwa
// arg4 : 節(jié)點(diǎn)類型 持久化節(jié)點(diǎn)
zk.create("/wh/node2", "node2".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void create2() throws Exception {
// arg3 : world:anyone:r
zk.create("/wh/node3", "node3".getBytes(),
ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@Test
public void create3() throws Exception {
// world授權(quán)模式
// 權(quán)限列表
List<ACL> acls = new ArrayList<ACL>();
// 授權(quán)模式和授權(quán)對象
Id id = new Id("world", "anyone");
// 權(quán)限設(shè)置
acls.add(new ACL(ZooDefs.Perms.READ, id));
acls.add(new ACL(ZooDefs.Perms.WRITE, id));
zk.create("/wh/node4", "node4".getBytes(),
acls, CreateMode.PERSISTENT);
}
@Test
public void create4() throws Exception {
// ip授權(quán)模式
// 權(quán)限列表
List<ACL> acls = new ArrayList<ACL>();
// 授權(quán)模式和授權(quán)對象
Id id = new Id("ip", "localhost");
// 權(quán)限設(shè)置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zk.create("/wh/node5", "node5".getBytes(),
acls, CreateMode.PERSISTENT);
}
@Test
public void create5() throws Exception {
// auth授權(quán)模式
// 添加授權(quán)用戶
zk.addAuthInfo("digest", "wh:1234".getBytes());
zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.CREATOR_ALL_ACL , CreateMode.PERSISTENT);
}
@Test
public void create6() throws Exception {
// auth授權(quán)模式
// 添加授權(quán)用戶
zk.addAuthInfo("digest", "wh:1234".getBytes());
// 權(quán)限列表
List<ACL> acls = new ArrayList<ACL>();
// 授權(quán)模式和授權(quán)對象
Id id = new Id("auth", "wh");
// 權(quán)限設(shè)置
acls.add(new ACL(ZooDefs.Perms.READ, id));
zk.create("/wh/node5", "node5".getBytes(),
acls , CreateMode.PERSISTENT);
}
@Test
public void create7() throws Exception {
// digest授權(quán)模式
// 權(quán)限列表
List<ACL> acls = new ArrayList<ACL>();
// 授權(quán)模式和授權(quán)對象
Id id = new Id("digest", "passwdMd5");
// 權(quán)限設(shè)置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zk.create("/wh/node5", "node5".getBytes(),
acls , CreateMode.PERSISTENT);
}
@Test
public void create8() throws Exception {
// 持久化順序節(jié)點(diǎn)
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(result); // 節(jié)點(diǎn)路徑
}
@Test
public void create9() throws Exception {
// 臨時節(jié)點(diǎn)
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL);
System.out.println(result); // 節(jié)點(diǎn)路徑
}
@Test
public void create10() throws Exception {
// 臨時順序節(jié)點(diǎn)
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(result); // 節(jié)點(diǎn)路徑
}
@Test
public void create11() throws Exception {
// 異步方式創(chuàng)建節(jié)點(diǎn)
zk.create("/wh/node5", "node5".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new AsyncCallback.StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
// 0 代表創(chuàng)建成功
System.out.println(rc);
// node path
System.out.println(path);
// node name
System.out.println(name);
// 上下文參數(shù)
System.out.println(ctx);
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package set;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkSet1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void set1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : new node data
// arg3 : 版本號,-1代表版本號不參與更新
Stat stat = zk.setData("/wh/node2", "node2".getBytes(), -1);
System.out.println(stat.getCzxid());
}
@Test
public void set2() throws KeeperException, InterruptedException {
// 異步方式修改節(jié)點(diǎn)
// arg1 : node path
// arg2 : new node data
// arg3 : 版本號,-1代表版本號不參與更新
zk.setData("/wh/node2", "node2".getBytes(), -1
, new AsyncCallback.StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0 代表創(chuàng)建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文參數(shù)
System.out.println(ctx);
// 屬性描述對象
System.out.println(stat.getVersion());
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package delete;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkDel1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void del1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg2 : 版本號,-1代表版本號不參與更新
zk.delete("/wh/node2", -1);
}
@Test
public void del2() throws KeeperException, InterruptedException {
// 異步方式刪除節(jié)點(diǎn)
// arg1 : node path
// arg2 : new node data
// arg3 : 版本號,-1代表版本號不參與更新
zk.delete("/wh/node2", -1
, new AsyncCallback.VoidCallback() {
public void processResult(int rc, String path, Object ctx) {
// 0 代表創(chuàng)建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文參數(shù)
System.out.println(ctx);
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package get;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkGet1 {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void get1() throws KeeperException, InterruptedException {
// arg1 : node path
// arg3 : 讀取節(jié)點(diǎn)屬性的對象
Stat stat = new Stat();
byte[] res = zk.getData("/wh/node2", false, stat);
// 打印節(jié)點(diǎn)數(shù)據(jù)
System.out.println(new String(res));
// 版本信息
System.out.println(stat.getVersion());
}
@Test
public void get2() throws KeeperException, InterruptedException {
// 異步方式獲取節(jié)點(diǎn)
// arg1 : node path
// arg3 : 版本號,-1代表版本號不參與更新
zk.getData("/wh/node2", false, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
// 0 代表創(chuàng)建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文參數(shù)
System.out.println(ctx);
// data
System.out.println(new String(data));
// 屬性描述對象
System.out.println(stat.getVersion());
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package getChildren;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZkGetChild {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void getChild1() throws KeeperException, InterruptedException {
// arg1 : node path
List<String> res = zk.getChildren("/wh/node2", false);
// 打印節(jié)點(diǎn)數(shù)據(jù)
for (String child : res)
System.out.println(child);
}
@Test
public void get2() throws KeeperException, InterruptedException {
// 異步方式獲取子節(jié)點(diǎn)
// arg1 : node path
zk.getChildren("/wh/node2", false, new AsyncCallback.ChildrenCallback() {
public void processResult(int rc, String path, Object ctx, List<String> children) {
// 0 代表創(chuàng)建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文參數(shù)
System.out.println(ctx);
// 屬性描述對象
for (String child : children) {
System.out.println(child);
}
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}

package exist;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkExistNode {
ZooKeeper zk;
@Before
public void before() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper("localhost:2181", 5000, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("connectioned ...");
latch.countDown();
}
}
});
latch.await();
}
@After
public void after() throws InterruptedException {
zk.close();
}
@Test
public void exists() throws KeeperException, InterruptedException {
// arg1 : node path
Stat stat = zk.exists("/wh/node2", false);
// 打印節(jié)點(diǎn)數(shù)據(jù)
System.out.println(stat);
// 版本信息
System.out.println(stat.getVersion());
}
@Test
public void exists1() throws KeeperException, InterruptedException {
// 異步方式獲取節(jié)點(diǎn)
// arg1 : node path
zk.exists("/wh/node2", false, new AsyncCallback.StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0 代表創(chuàng)建成功
System.out.println(rc);
// node path
System.out.println(path);
// 上下文參數(shù)
System.out.println(ctx);
// 屬性描述對象
System.out.println(stat.getVersion());
}
}, "I am context");
Thread.sleep(10000);
System.out.println("all done...");
}
}