1. 概述
????一個(gè)第三方的分布式服務(wù)程序, 為別的分布式程序服務(wù),存儲(chǔ)的都是狀態(tài)數(shù)據(jù)
????數(shù)據(jù)保管(分布式中每一臺(tái)的狀態(tài)信息、數(shù)據(jù)信息)
????節(jié)點(diǎn)監(jiān)聽(監(jiān)聽掉線)
????分布式服務(wù)器主從選舉、掉線分配管理
????配置管理(管理分布式服務(wù)器的通用配置參數(shù)、文件,用于配置更新)
????分布式共享鎖
2. 基本要求
????因此zookeeper要求自身高可靠的分布式集群(奇數(shù)個(gè)節(jié)點(diǎn)最少一臺(tái),一般三臺(tái),半數(shù)以上節(jié)點(diǎn)存活,zk就能正常服務(wù))
3. zookeeper的集群角色分配原理

????三臺(tái)機(jī)器如圖,需要配置id分別為1、2、3,配置文件決定zookeeper集群有哪些備選節(jié)點(diǎn)????
????首先啟動(dòng)mini1,先在集群中投票,發(fā)現(xiàn)只有自己?jiǎn)?dòng)就投票給自己,此時(shí)mini1 一票
????再啟動(dòng)mini2,在集群中mini1發(fā)現(xiàn)mini2上線,給mini2投一票;mini2先給自己投一票,再給mini1投一票。此時(shí)mini1有2票,mini2也有2票
????mini1、2各有兩票,假設(shè)機(jī)制是根據(jù)id大小來決定,由于mini2的id>mini1的id,那么mini2和mini1都會(huì)投票給mini2。此時(shí)mini2有4票,mini1有2票,那么mini2做為leader,mini1作為follower
????最后mini3上線,發(fā)現(xiàn)mini2已經(jīng)是一個(gè)leader,自己就作為follower。
????當(dāng)有數(shù)據(jù)提交到follower的時(shí)候,follower都會(huì)將數(shù)據(jù)直接轉(zhuǎn)發(fā)給leader,再由leader分發(fā)給集群中的follower
4. zookeeper官網(wǎng):https://zookeeper.apache.org/
5. 配置zookeeper
5.1 配置文件(需要將zoo_sample.cfg改成zoo.cfg)
????/conf/zoo.cfg 配置詳解:https://www.cnblogs.com/xiohao/p/5541093.html
5.2 配置實(shí)例
????(這個(gè)配置在zookeeper集群中每一臺(tái)都相同) 注意mini1,mini2, mini3需要映射ip,如果不想做主機(jī)名-IP映射,就改成主機(jī)ip
對(duì)配置文件的添加或者修改用粗體標(biāo)明了:
????# The number of milliseconds of each tick
????tickTime=2000
????# The number of ticks that the initial
????# synchronization phase can take
????initLimit=10
????# The number of ticks that can pass between
????# sending a request and getting an acknowledgement
????syncLimit=5
????# the directory where the snapshot is stored.
????# do not use /tmp for storage, /tmp here is just
????# example sakes.
????dataDir=/kluter/zookeeper-3.4.11/data
????# the port at which the clients will connect
????clientPort=2181
????# the maximum number of client connections.
????# increase this if you need to handle more clients
????#maxClientCnxns=60
????#
????# Be sure to read the maintenance section of the
????# administrator guide before turning on autopurge.
????#
????# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
????#
????# The number of snapshots to retain in dataDir
????#autopurge.snapRetainCount=3
????# Purge task interval in hours
????# Set to "0" to disable auto purge feature
????#autopurge.purgeInterval=1
????#configure myid and port1/port2
????#port1:default lead/follower port 2888
????#prot2:default vote port 3888
????server.1=zookeeper1:2888:3888
????server.2=zookeeper2:2888:3888
????server.3=zookeeper3:2888:3888
5.3 分別進(jìn)入每一臺(tái)zookeeper數(shù)據(jù)目錄配置myid
????第一臺(tái)為1, 第二臺(tái)為2, 第三臺(tái)為3
????# cd /kluter/zookeeper-3.4.11/data/
????# echo 1 > myid
5.4 運(yùn)行zookeeper集群 zkServer.sh
????這里參考一偏連接異常的文章,主要是配置文件的主機(jī)名問題
????https://www.cnblogs.com/xiaohua92/p/5460515.html
????#./bin/zkServer.sh start
????#jps
????#./bin/zkServer.sh status
????出現(xiàn)Error contacting service. It is probably not running. 說明集群狀態(tài)異常。一臺(tái)zookeeper也會(huì)異常,因?yàn)橹辽傩枰?臺(tái),最好是官方的3臺(tái)或3臺(tái)以上的奇數(shù)臺(tái)
????需要停止iptables服務(wù),否則2888、3888端口不通
????#systemctl stop firewalld.service
5.5 命令行本地客戶端連接
????#/bin/zkCli.sh#顯示[zk: localhost:2181(CONNECTED) 0] 表示命令行連接本機(jī)成功
????[zk: localhost:2181(CONNECTED) 0]help
????ZooKeeper -server host:port cmd args
? ? ? ? ????stat path [watch]
? ? ? ? ????set path data [version]
????ls path[watch]#ls /? ? ? ? ? ? ? ? ? ? ?//查看根znode下面的節(jié)點(diǎn)
? ? ? ? ????delquota [-n|-b] path
? ? ? ? ????ls2 path [watch]
? ? ? ? ????setAcl path acl
? ? ? ? ????setquota -n|-b val path
? ? ? ? ????history
? ? ? ? ????redo cmdno
? ? ? ? ????printwatches on|off
? ? ? ? ????delete path [version]
? ? ? ? ????sync path
? ? ? ? ????listquota path
? ? ? ? ????rmr path
????get path[watch]#get /app1 查看節(jié)點(diǎn)app1的數(shù)據(jù)
? ? ? ? ????create [-s] [-e] path data acl
? ? ? ? ????addauth scheme auth
? ? ? ? ????quit
? ? ? ? ????getAcl path
? ? ? ? ????close
? ? ? ? ????connect host:port
????[zk: localhost:2181(CONNECTED) 0]connect 10.10.77.192:2181#連接另一臺(tái)
5.6 zookeeper目錄結(jié)構(gòu)
????參考zkCli.sh詳解:https://blog.csdn.net/xyang81/article/details/53053642


PS:這里的斷開連接指的是客戶端斷開與服務(wù)器的連接,客戶端不斷開連接,創(chuàng)建的EPHEMERAL會(huì)一直存在??蛻舳藬嚅_了,EPHEMERAL節(jié)點(diǎn)會(huì)被服務(wù)器自動(dòng)刪除
5.6.1 創(chuàng)建兩種類型節(jié)點(diǎn)并添加數(shù)據(jù)
????短暫連接-e(ephemeral)用于集群中節(jié)點(diǎn)掉線
????[zk: localhost:2181(CONNECTED) 10]create -e /app-emphemeral 88888
????[zk: localhost:2181(CONNECTED) 10]quit #退出,那么短暫連接的節(jié)點(diǎn)自己刪除了
????#./zkCli.sh #再次進(jìn)入命令行模式
????[zk: localhost:2181(CONNECTED) 0]ls /? ? ? 查看驗(yàn)證結(jié)果
????反之用-s則退出后不會(huì)刪除節(jié)點(diǎn)
????[zk: localhost:2181(CONNECTED) 2] create -s /testZnode alsdkjfslkdfj
????[zk: localhost:2181(CONNECTED) 10] ls /
????[zookeeper, app1, testZnode0000000002] #發(fā)現(xiàn)自動(dòng)在testZnode增加了序號(hào)
5.6.2 更新節(jié)點(diǎn)數(shù)據(jù)
????set /testZnode0000000002 11111111
5.6.3 監(jiān)聽節(jié)點(diǎn)數(shù)據(jù)更改
????[zk: 10.10.77.191:2181(CONNECTED) 10] get /testZnode0000000002 watch
????[zk: 10.10.77.192:2181(CONNECTED) 10] set /testZnode0000000002 xxxxxxx
????192上更新數(shù)據(jù)后,191上自動(dòng)收到消息,但是只生效一次,而且監(jiān)聽不到子節(jié)點(diǎn)創(chuàng)建
????WATCHER::
????WatchedEvent state:SyncConnected type:NodeDataChanged path:/testZnode0000000002
5.6.4 監(jiān)聽子節(jié)點(diǎn)【數(shù)據(jù)】
????[zk: 10.10.77.191:2181(CONNECTED) 10]ls /testZnode00000000002 watch
????[zk: 10.10.77.192:2181(CONNECTED) 10] create /testZnode00000000002/crate 11111
????192上創(chuàng)建子節(jié)點(diǎn)后,191上自動(dòng)收到消息,而且只有數(shù)據(jù)更新會(huì)收到消息
????type:NodeChildrenChanged path:/testZnode0000000002
5.6.5 節(jié)點(diǎn)刪除
????刪除一個(gè)節(jié)點(diǎn) delete /testZnode0000000002/testnode3
????遞歸刪除根節(jié)點(diǎn) rmr /testZnode0000000002
6. zookeeper的java api
zookeeper API文檔地址:https://zookeeper.apache.org/doc/r3.4.11/api/index.html
6.1. 方式一:本地導(dǎo)入zookeeper包
看到這里,我假設(shè)大家都有java基礎(chǔ)并且使用eclipse進(jìn)行過開發(fā)
在eclipse中新建project:zookeeper,并增加一個(gè)lib目錄
解壓zookeeper-3.4.11.tar.gz,將zookeeper中的7個(gè)jar包全部導(dǎo)入eclipse的lib目錄中并且bulild path

new一個(gè)class:

測(cè)試代碼:


測(cè)試方式就是右鍵run Junit test
6.2. 方式二:使用maven自動(dòng)導(dǎo)包
新建一個(gè)zookeeper的maven project,創(chuàng)建過程參考:http://www.itdecent.cn/p/662a8291e0e3
這里只給出maven的pom.xml:


注意:
? ? process()方法只是zkCli的一個(gè)回調(diào)函數(shù),如果設(shè)置監(jiān)聽為true,當(dāng)監(jiān)聽的節(jié)點(diǎn)發(fā)生了變化,就會(huì)調(diào)用process回調(diào)函數(shù)。監(jiān)聽功能是一個(gè)Daemon守護(hù)線程,當(dāng)主線程退出,守護(hù)線程也會(huì)退出。
關(guān)于守護(hù)線程,請(qǐng)參看:http://www.itdecent.cn/p/303507fc8b6d 的第10. 守護(hù)線程

運(yùn)行創(chuàng)建節(jié)點(diǎn)的代碼發(fā)生Excepption:MarshallingErrorException。原因是第三個(gè)參數(shù)不能為null,必須寫一個(gè)完整的。修改如下:

運(yùn)行雖然成功了,但是去命令行查看創(chuàng)建的新節(jié)點(diǎn)Kluter并不存在,原因是CreateMode使用的是EPHEMERAL,客戶端終止后就自動(dòng)刪除了。改為CreateMode.PERSISTENT,即便客戶端終止,節(jié)點(diǎn)Kluter也不會(huì)消失了。
之前的代碼并不能監(jiān)聽節(jié)點(diǎn)的變化,下面getChildren方法,第二個(gè)參數(shù)設(shè)置為true可以監(jiān)聽"/"下的所有節(jié)點(diǎn)。

測(cè)試監(jiān)聽:
????運(yùn)行代碼:

休眠是為了讓程序不退出。接著再命令行刪除程序創(chuàng)建的節(jié)點(diǎn)kluter,這樣程序的console會(huì)打?。?/p>

但是,再刪除一個(gè)根下的節(jié)點(diǎn),并不會(huì)有監(jiān)聽事件發(fā)生,說明初始化中的監(jiān)聽事件只會(huì)發(fā)生一次,如果希望持續(xù)發(fā)生,則每次監(jiān)聽到事件之后需要再設(shè)置下一次的監(jiān)聽。修改初始化代碼如下:

余下代碼和6.1. 中的代碼相同,不再贅述。
7. 客戶端動(dòng)態(tài)感知集群中的服務(wù)器上下線

架構(gòu)解析:
? ? 1. 上半部分是服務(wù)器集群,每臺(tái)服務(wù)器上線的時(shí)候都需要到zookeeper上去創(chuàng)建節(jié)點(diǎn),同一類服務(wù)器在同一個(gè)根節(jié)點(diǎn)下創(chuàng)建子節(jié)點(diǎn)(使用EPHEMERAL_SEQUENTIAL臨時(shí)序列化增長(zhǎng)的模式)。
? ? 2. 用戶的客戶端程序啟動(dòng)時(shí),每次到zookeeper去獲取某一類服務(wù)器根節(jié)點(diǎn)下面的子節(jié)點(diǎn),并getChildren做監(jiān)聽,就能知道服務(wù)器的online和offline
? ? 3. 負(fù)載均衡:當(dāng)有用戶需要做業(yè)務(wù)查詢時(shí),先比較每個(gè)服務(wù)器節(jié)點(diǎn)的data(data中記錄的是連接數(shù)),選取連接數(shù)最小的去做查詢,并且set data連接數(shù)+1,查詢完成后設(shè)置連接數(shù)-1.
7.1. 程序?qū)崿F(xiàn)
每一臺(tái)server端運(yùn)行代碼:
public class DistributedSrv {
????private static final String connectString = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";
????private static final int sessionTimeout = 2000;
????private static final String parentNode = "/servers";
????private ZooKeeper zkHandler = null;
????public void getConnect() throws Exception {
????????//get zk connection
????????zkHandler = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
????????????public void process(WatchedEvent event) {
????????????????System.out.println(event.getType() + "#############" + event.getPath());
????????????????try {
????????????????????zkHandler.getChildren("/", true);
????????????????} catch (Exception e) {
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
????}
????public void registerServer(String hostname) throws Exception {
????????String createPath = zkHandler.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
????????System.out.println(hostname + "is online" + createPath);
????}
????public void handleBusiness(String hostName) throws Exception {
????????System.out.println(hostName + " start working......");
????????Thread.sleep(Long.MAX_VALUE);
????}
}
每一臺(tái)客戶端需要運(yùn)行:
public class DistributedCli {
????private static final String connectString = "10.10.77.191:2181,10.10.77.192:2181,10.10.77.193:2181";
????private static final int sessionTimeout = 2000;
????private static final String parentNode = "/servers";
????private volatile List<String> serverList; //volatile: muti-thread do with the serverList with no copy
????private ZooKeeper zkHandler = null;
????public void getConnect() throws Exception {
????????//get zk connection
????????zkHandler = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
????????????public void process(WatchedEvent event) {
????????????????System.out.println(event.getType() + "#############" + event.getPath());
????????????????try {
????????????????????getServerList();//node changed, update node list
????????????????} catch (Exception e) {
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
????}
????public void getServerList() throws Exception {
????????List<String> childNode = zkHandler.getChildren(parentNode, true); //register watch
????????//create a local srvList
????????ArrayList<String> srvList = new ArrayList<String>();
????????for(String nodeName: childNode) {//show nodeName
????????????byte[] nodeData = zkHandler.getData(parentNode + "/" + nodeName, false, null);
????????????srvList.add(new String(nodeData));
????????}
????????//give to the object member
????????serverList = srvList;
????????System.out.println(srvList);
????}
????public void handleBusiness() throws Exception {
????????System.out.println("client do business......");
????????Thread.sleep(Long.MAX_VALUE);
????}
}
7.2. 打包并用java命令運(yùn)行程序
由于我們編輯、編譯、運(yùn)行代碼是在eclipse中完成的,但是這樣并不能夠在linux環(huán)境下用腳本或者命令來執(zhí)行。我們可以通過打包的方式來運(yùn)行調(diào)試好的程序:
下面以server端為例,打包步驟:
? ? 1. 先運(yùn)行一下調(diào)試好的程序,否則添加jar包不會(huì)又相應(yīng)的選項(xiàng)
? ? 2.?

? ? 3.?

? ? 4.?

? ? 5. 打包成功后,可以在命令行直接運(yùn)行了

????-jar: 后面跟剛才打包好的zk.jar包
? ? zookeeper1:是程序中的參數(shù)arg[0]
8. 使用Zookeeper實(shí)現(xiàn)分布式共享鎖的功能
在多線程同步中:http://www.itdecent.cn/p/6f98f03430eb, 我們學(xué)習(xí)了如何在多個(gè)線程間保證同步訪問。但如果遇到多個(gè)客戶端訪問某個(gè)服務(wù)器同一個(gè)資源的情況就不合適了,多個(gè)客戶端相當(dāng)于是多個(gè)不同的進(jìn)程,進(jìn)程間想要同步,這里就可以使用zookeeper來實(shí)現(xiàn)。

實(shí)現(xiàn)步驟:
1. 程序節(jié)點(diǎn)啟動(dòng)時(shí),到zk上注冊(cè)一個(gè)節(jié)點(diǎn)(EPHEMERAL_SEQUENTIAL),并監(jiān)聽其父節(jié)點(diǎn)
2. 獲取父節(jié)點(diǎn)下的所有程序子節(jié)點(diǎn),比較序號(hào)的大小
3. 因?yàn)樾蛱?hào)是單調(diào)遞增的,讓序號(hào)最小的獲取到“鎖”(不是一個(gè)真正的鎖,不是線程同步鎖),去訪問資源,訪問完后,刪除自己的節(jié)點(diǎn)(相當(dāng)于釋放鎖),并且重新注冊(cè)一個(gè)新的節(jié)點(diǎn),序號(hào)又+1增長(zhǎng)
4. 其他程序節(jié)點(diǎn)會(huì)收到時(shí)間通知,則可以去zk上看是否節(jié)點(diǎn)序號(hào)是最小的,最小的才能獲取鎖
偽代碼:
//getConnect()
//registerLock(/lock/app.EPHEMERAL_SEQUENTIAL)
//getLock(){獲取子節(jié)點(diǎn),比較自己的序號(hào)是否是最小的,如果是,則返回鎖獲取成功}
//訪問資源
//釋放鎖releaseLock(){刪除自己的子節(jié)點(diǎn),并創(chuàng)建一個(gè)新的節(jié)點(diǎn)}
//監(jiān)聽器
process(){getLock()}
完整代碼:
public class DistributedCliLock {
????private static final int SESSION_TIMEOUT = 5000;
????private String hosts = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";
????private String groupNode = "locks";
????private String subNode = "sub";
????private boolean haveLock = false;
????private ZooKeeper zk;
????private volatile String thisPath; //record the childNode path of itself
????/**
????*
????* connect to zookeeper
????*/
????public void connectZookeeper() throws Exception{
????????zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
????????????public void process(WatchedEvent event){
????????????????try{
????????????????????//check event type, only process children Node
????????????????????if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)){
????????????????????????//get child node, and watch parent node
????????????????????????List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
????????????????????????String thisNode = thisPath.substring(("/" + groupNode + "/").length());
????????????????????????//sort and compare the id
????????????????????????Collections.sort(childrenNodes);
????????????????????????if(childrenNodes.indexOf(thisNode) == 0){
????????????????????????????//critical section and release the node
????????????????????????????doSomething();
? ? ? ? ? ? ? ? ? ? ? ? ? ? //re-create a node, id+1
????????????????????????????thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
????????????????????????}
????????????????????}
????????????????}catch(Exception e){e.printStackTrace();}
????????????}
????????});
????????// regist node at the first time
????????thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
????????//wait for a while, in order to see the result
????????Thread.sleep(new Random().nextInt(1000));
????????//get all the sub nodes
????????List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
????????//only one client, you can do something right now
????????if(childrenNodes.size() == 1){
????????????doSomething();
????????????thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
????????}
????}
????//do with the critical section, at the end of the function, release lock
????private void doSomething() throws Exception {
????????try {
????????????out.println("gain lock: " + thisPath);
????????????Thread.sleep(2000);
????????} finally {
????????????out.println("finished: " + thisPath);
????????????zk.delete(this.thisPath, -1);
????????}
????}
????public static void main(String[] args) throws Exception {
????????DistributedCliLock dl = new DistributedCliLock();
????????dl.connectZookeeper();
????????Thread.sleep(Long.MAX_VALUE);
????}
}
