zookeeper

1. 概述

????一個(gè)第三方的分布式服務(wù)程序, 為別的分布式程序服務(wù),存儲(chǔ)的都是狀態(tài)數(shù)據(jù)

????數(shù)據(jù)保管(分布式中每一臺(tái)的狀態(tài)信息、數(shù)據(jù)信息)

????節(jié)點(diǎn)監(jiān)聽(監(jiān)聽掉線)

????分布式服務(wù)器主從選舉、掉線分配管理

????配置管理(管理分布式服務(wù)器的通用配置參數(shù)、文件,用于配置更新)

????分布式共享鎖


2. 基本要求

????因此zookeeper要求自身高可靠的分布式集群(奇數(shù)個(gè)節(jié)點(diǎn)最少一臺(tái),一般三臺(tái),半數(shù)以上節(jié)點(diǎn)存活,zk就能正常服務(wù))


3. zookeeper的集群角色分配原理

????三臺(tái)機(jī)器如圖,需要配置id分別為1、2、3,配置文件決定zookeeper集群有哪些備選節(jié)點(diǎn)????

????首先啟動(dòng)mini1,先在集群中投票,發(fā)現(xiàn)只有自己?jiǎn)?dòng)就投票給自己,此時(shí)mini1 一票

????再啟動(dòng)mini2,在集群中mini1發(fā)現(xiàn)mini2上線,給mini2投一票;mini2先給自己投一票,再給mini1投一票。此時(shí)mini1有2票,mini2也有2票

????mini1、2各有兩票,假設(shè)機(jī)制是根據(jù)id大小來決定,由于mini2的id>mini1的id,那么mini2和mini1都會(huì)投票給mini2。此時(shí)mini2有4票,mini1有2票,那么mini2做為leader,mini1作為follower

????最后mini3上線,發(fā)現(xiàn)mini2已經(jīng)是一個(gè)leader,自己就作為follower。

????當(dāng)有數(shù)據(jù)提交到follower的時(shí)候,follower都會(huì)將數(shù)據(jù)直接轉(zhuǎn)發(fā)給leader,再由leader分發(fā)給集群中的follower


4. zookeeper官網(wǎng):https://zookeeper.apache.org/


5. 配置zookeeper

5.1 配置文件(需要將zoo_sample.cfg改成zoo.cfg)

????/conf/zoo.cfg 配置詳解:https://www.cnblogs.com/xiohao/p/5541093.html


5.2 配置實(shí)例

????(這個(gè)配置在zookeeper集群中每一臺(tái)都相同) 注意mini1,mini2, mini3需要映射ip,如果不想做主機(jī)名-IP映射,就改成主機(jī)ip

對(duì)配置文件的添加或者修改用粗體標(biāo)明了:

????# The number of milliseconds of each tick

????tickTime=2000

????# The number of ticks that the initial

????# synchronization phase can take

????initLimit=10

????# The number of ticks that can pass between

????# sending a request and getting an acknowledgement

????syncLimit=5

????# the directory where the snapshot is stored.

????# do not use /tmp for storage, /tmp here is just

????# example sakes.

????dataDir=/kluter/zookeeper-3.4.11/data

????# the port at which the clients will connect

????clientPort=2181

????# the maximum number of client connections.

????# increase this if you need to handle more clients

????#maxClientCnxns=60

????#

????# Be sure to read the maintenance section of the

????# administrator guide before turning on autopurge.

????#

????# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

????#

????# The number of snapshots to retain in dataDir

????#autopurge.snapRetainCount=3

????# Purge task interval in hours

????# Set to "0" to disable auto purge feature

????#autopurge.purgeInterval=1


????#configure myid and port1/port2

????#port1:default lead/follower port 2888

????#prot2:default vote port 3888

????server.1=zookeeper1:2888:3888

????server.2=zookeeper2:2888:3888

????server.3=zookeeper3:2888:3888


5.3 分別進(jìn)入每一臺(tái)zookeeper數(shù)據(jù)目錄配置myid

????第一臺(tái)為1, 第二臺(tái)為2, 第三臺(tái)為3

????# cd /kluter/zookeeper-3.4.11/data/

????# echo 1 > myid


5.4 運(yùn)行zookeeper集群 zkServer.sh

????這里參考一偏連接異常的文章,主要是配置文件的主機(jī)名問題

????https://www.cnblogs.com/xiaohua92/p/5460515.html

????#./bin/zkServer.sh start

????#jps

????#./bin/zkServer.sh status

????出現(xiàn)Error contacting service. It is probably not running. 說明集群狀態(tài)異常。一臺(tái)zookeeper也會(huì)異常,因?yàn)橹辽傩枰?臺(tái),最好是官方的3臺(tái)或3臺(tái)以上的奇數(shù)臺(tái)

????需要停止iptables服務(wù),否則2888、3888端口不通

????#systemctl stop firewalld.service


5.5 命令行本地客戶端連接

????#/bin/zkCli.sh#顯示[zk: localhost:2181(CONNECTED) 0] 表示命令行連接本機(jī)成功

????[zk: localhost:2181(CONNECTED) 0]help

????ZooKeeper -server host:port cmd args

? ? ? ? ????stat path [watch]

? ? ? ? ????set path data [version]

????ls path[watch]#ls /? ? ? ? ? ? ? ? ? ? ?//查看根znode下面的節(jié)點(diǎn)

? ? ? ? ????delquota [-n|-b] path

? ? ? ? ????ls2 path [watch]

? ? ? ? ????setAcl path acl

? ? ? ? ????setquota -n|-b val path

? ? ? ? ????history

? ? ? ? ????redo cmdno

? ? ? ? ????printwatches on|off

? ? ? ? ????delete path [version]

? ? ? ? ????sync path

? ? ? ? ????listquota path

? ? ? ? ????rmr path

????get path[watch]#get /app1 查看節(jié)點(diǎn)app1的數(shù)據(jù)

? ? ? ? ????create [-s] [-e] path data acl

? ? ? ? ????addauth scheme auth

? ? ? ? ????quit

? ? ? ? ????getAcl path

? ? ? ? ????close

? ? ? ? ????connect host:port

????[zk: localhost:2181(CONNECTED) 0]connect 10.10.77.192:2181#連接另一臺(tái)


5.6 zookeeper目錄結(jié)構(gòu)

????參考zkCli.sh詳解:https://blog.csdn.net/xyang81/article/details/53053642

PS:這里的斷開連接指的是客戶端斷開與服務(wù)器的連接,客戶端不斷開連接,創(chuàng)建的EPHEMERAL會(huì)一直存在??蛻舳藬嚅_了,EPHEMERAL節(jié)點(diǎn)會(huì)被服務(wù)器自動(dòng)刪除


5.6.1 創(chuàng)建兩種類型節(jié)點(diǎn)并添加數(shù)據(jù)

????短暫連接-e(ephemeral)用于集群中節(jié)點(diǎn)掉線

????[zk: localhost:2181(CONNECTED) 10]create -e /app-emphemeral 88888

????[zk: localhost:2181(CONNECTED) 10]quit #退出,那么短暫連接的節(jié)點(diǎn)自己刪除了

????#./zkCli.sh #再次進(jìn)入命令行模式

????[zk: localhost:2181(CONNECTED) 0]ls /? ? ? 查看驗(yàn)證結(jié)果

????反之用-s則退出后不會(huì)刪除節(jié)點(diǎn)

????[zk: localhost:2181(CONNECTED) 2] create -s /testZnode alsdkjfslkdfj

????[zk: localhost:2181(CONNECTED) 10] ls /

????[zookeeper, app1, testZnode0000000002] #發(fā)現(xiàn)自動(dòng)在testZnode增加了序號(hào)


5.6.2 更新節(jié)點(diǎn)數(shù)據(jù)

????set /testZnode0000000002 11111111


5.6.3 監(jiān)聽節(jié)點(diǎn)數(shù)據(jù)更改

????[zk: 10.10.77.191:2181(CONNECTED) 10] get /testZnode0000000002 watch

????[zk: 10.10.77.192:2181(CONNECTED) 10] set /testZnode0000000002 xxxxxxx

????192上更新數(shù)據(jù)后,191上自動(dòng)收到消息,但是只生效一次,而且監(jiān)聽不到子節(jié)點(diǎn)創(chuàng)建

????WATCHER::

????WatchedEvent state:SyncConnected type:NodeDataChanged path:/testZnode0000000002


5.6.4 監(jiān)聽子節(jié)點(diǎn)【數(shù)據(jù)】

????[zk: 10.10.77.191:2181(CONNECTED) 10]ls /testZnode00000000002 watch

????[zk: 10.10.77.192:2181(CONNECTED) 10] create /testZnode00000000002/crate 11111

????192上創(chuàng)建子節(jié)點(diǎn)后,191上自動(dòng)收到消息,而且只有數(shù)據(jù)更新會(huì)收到消息

????type:NodeChildrenChanged path:/testZnode0000000002


5.6.5 節(jié)點(diǎn)刪除

????刪除一個(gè)節(jié)點(diǎn) delete /testZnode0000000002/testnode3

????遞歸刪除根節(jié)點(diǎn) rmr /testZnode0000000002


6. zookeeper的java api

zookeeper API文檔地址:https://zookeeper.apache.org/doc/r3.4.11/api/index.html

6.1. 方式一:本地導(dǎo)入zookeeper包

看到這里,我假設(shè)大家都有java基礎(chǔ)并且使用eclipse進(jìn)行過開發(fā)

在eclipse中新建project:zookeeper,并增加一個(gè)lib目錄

解壓zookeeper-3.4.11.tar.gz,將zookeeper中的7個(gè)jar包全部導(dǎo)入eclipse的lib目錄中并且bulild path

new一個(gè)class:

測(cè)試代碼:

測(cè)試方式就是右鍵run Junit test


6.2. 方式二:使用maven自動(dòng)導(dǎo)包

新建一個(gè)zookeeper的maven project,創(chuàng)建過程參考:http://www.itdecent.cn/p/662a8291e0e3

這里只給出maven的pom.xml:


初始化

注意:

? ? process()方法只是zkCli的一個(gè)回調(diào)函數(shù),如果設(shè)置監(jiān)聽為true,當(dāng)監(jiān)聽的節(jié)點(diǎn)發(fā)生了變化,就會(huì)調(diào)用process回調(diào)函數(shù)。監(jiān)聽功能是一個(gè)Daemon守護(hù)線程,當(dāng)主線程退出,守護(hù)線程也會(huì)退出。

關(guān)于守護(hù)線程,請(qǐng)參看:http://www.itdecent.cn/p/303507fc8b6d 的第10. 守護(hù)線程


創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)(參數(shù)錯(cuò)誤)

運(yùn)行創(chuàng)建節(jié)點(diǎn)的代碼發(fā)生Excepption:MarshallingErrorException。原因是第三個(gè)參數(shù)不能為null,必須寫一個(gè)完整的。修改如下:

創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)

運(yùn)行雖然成功了,但是去命令行查看創(chuàng)建的新節(jié)點(diǎn)Kluter并不存在,原因是CreateMode使用的是EPHEMERAL,客戶端終止后就自動(dòng)刪除了。改為CreateMode.PERSISTENT,即便客戶端終止,節(jié)點(diǎn)Kluter也不會(huì)消失了。


之前的代碼并不能監(jiān)聽節(jié)點(diǎn)的變化,下面getChildren方法,第二個(gè)參數(shù)設(shè)置為true可以監(jiān)聽"/"下的所有節(jié)點(diǎn)。

獲取根路徑下節(jié)點(diǎn),并設(shè)置監(jiān)聽

測(cè)試監(jiān)聽:

????運(yùn)行代碼:

休眠是為了讓程序不退出。接著再命令行刪除程序創(chuàng)建的節(jié)點(diǎn)kluter,這樣程序的console會(huì)打?。?/p>

監(jiān)聽到的時(shí)間類型和路徑

但是,再刪除一個(gè)根下的節(jié)點(diǎn),并不會(huì)有監(jiān)聽事件發(fā)生,說明初始化中的監(jiān)聽事件只會(huì)發(fā)生一次,如果希望持續(xù)發(fā)生,則每次監(jiān)聽到事件之后需要再設(shè)置下一次的監(jiān)聽。修改初始化代碼如下:

能夠持續(xù)監(jiān)聽


余下代碼和6.1. 中的代碼相同,不再贅述。


7. 客戶端動(dòng)態(tài)感知集群中的服務(wù)器上下線

架構(gòu)圖

架構(gòu)解析:

? ? 1. 上半部分是服務(wù)器集群,每臺(tái)服務(wù)器上線的時(shí)候都需要到zookeeper上去創(chuàng)建節(jié)點(diǎn),同一類服務(wù)器在同一個(gè)根節(jié)點(diǎn)下創(chuàng)建子節(jié)點(diǎn)(使用EPHEMERAL_SEQUENTIAL臨時(shí)序列化增長(zhǎng)的模式)。

? ? 2. 用戶的客戶端程序啟動(dòng)時(shí),每次到zookeeper去獲取某一類服務(wù)器根節(jié)點(diǎn)下面的子節(jié)點(diǎn),并getChildren做監(jiān)聽,就能知道服務(wù)器的online和offline

? ? 3. 負(fù)載均衡:當(dāng)有用戶需要做業(yè)務(wù)查詢時(shí),先比較每個(gè)服務(wù)器節(jié)點(diǎn)的data(data中記錄的是連接數(shù)),選取連接數(shù)最小的去做查詢,并且set data連接數(shù)+1,查詢完成后設(shè)置連接數(shù)-1.


7.1. 程序?qū)崿F(xiàn)

每一臺(tái)server端運(yùn)行代碼:

public class DistributedSrv {

????private static final String connectString = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";

????private static final int sessionTimeout = 2000;

????private static final String parentNode = "/servers";

????private ZooKeeper zkHandler = null;

????public void getConnect() throws Exception {

????????//get zk connection

????????zkHandler = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

????????????public void process(WatchedEvent event) {

????????????????System.out.println(event.getType() + "#############" + event.getPath());

????????????????try {

????????????????????zkHandler.getChildren("/", true);

????????????????} catch (Exception e) {

????????????????????e.printStackTrace();

????????????????}

????????????}

????????});

????}


????public void registerServer(String hostname) throws Exception {

????????String createPath = zkHandler.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

????????System.out.println(hostname + "is online" + createPath);

????}


????public void handleBusiness(String hostName) throws Exception {

????????System.out.println(hostName + " start working......");

????????Thread.sleep(Long.MAX_VALUE);

????}

}

每一臺(tái)客戶端需要運(yùn)行:

public class DistributedCli {

????private static final String connectString = "10.10.77.191:2181,10.10.77.192:2181,10.10.77.193:2181";

????private static final int sessionTimeout = 2000;

????private static final String parentNode = "/servers";

????private volatile List<String> serverList; //volatile: muti-thread do with the serverList with no copy

????private ZooKeeper zkHandler = null;


????public void getConnect() throws Exception {

????????//get zk connection

????????zkHandler = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

????????????public void process(WatchedEvent event) {

????????????????System.out.println(event.getType() + "#############" + event.getPath());

????????????????try {

????????????????????getServerList();//node changed, update node list

????????????????} catch (Exception e) {

????????????????????e.printStackTrace();

????????????????}

????????????}

????????});

????}


????public void getServerList() throws Exception {

????????List<String> childNode = zkHandler.getChildren(parentNode, true); //register watch

????????//create a local srvList

????????ArrayList<String> srvList = new ArrayList<String>();

????????for(String nodeName: childNode) {//show nodeName

????????????byte[] nodeData = zkHandler.getData(parentNode + "/" + nodeName, false, null);

????????????srvList.add(new String(nodeData));

????????}

????????//give to the object member

????????serverList = srvList;

????????System.out.println(srvList);

????}


????public void handleBusiness() throws Exception {

????????System.out.println("client do business......");

????????Thread.sleep(Long.MAX_VALUE);

????}

}


7.2. 打包并用java命令運(yùn)行程序

由于我們編輯、編譯、運(yùn)行代碼是在eclipse中完成的,但是這樣并不能夠在linux環(huán)境下用腳本或者命令來執(zhí)行。我們可以通過打包的方式來運(yùn)行調(diào)試好的程序:

下面以server端為例,打包步驟:

? ? 1. 先運(yùn)行一下調(diào)試好的程序,否則添加jar包不會(huì)又相應(yīng)的選項(xiàng)

? ? 2.?

? ? 3.?

? ? 4.?

? ? 5. 打包成功后,可以在命令行直接運(yùn)行了

????-jar: 后面跟剛才打包好的zk.jar包

? ? zookeeper1:是程序中的參數(shù)arg[0]


8. 使用Zookeeper實(shí)現(xiàn)分布式共享鎖的功能

在多線程同步中:http://www.itdecent.cn/p/6f98f03430eb, 我們學(xué)習(xí)了如何在多個(gè)線程間保證同步訪問。但如果遇到多個(gè)客戶端訪問某個(gè)服務(wù)器同一個(gè)資源的情況就不合適了,多個(gè)客戶端相當(dāng)于是多個(gè)不同的進(jìn)程,進(jìn)程間想要同步,這里就可以使用zookeeper來實(shí)現(xiàn)。

實(shí)現(xiàn)步驟:

1. 程序節(jié)點(diǎn)啟動(dòng)時(shí),到zk上注冊(cè)一個(gè)節(jié)點(diǎn)(EPHEMERAL_SEQUENTIAL),并監(jiān)聽其父節(jié)點(diǎn)

2. 獲取父節(jié)點(diǎn)下的所有程序子節(jié)點(diǎn),比較序號(hào)的大小

3. 因?yàn)樾蛱?hào)是單調(diào)遞增的,讓序號(hào)最小的獲取到“鎖”(不是一個(gè)真正的鎖,不是線程同步鎖),去訪問資源,訪問完后,刪除自己的節(jié)點(diǎn)(相當(dāng)于釋放鎖),并且重新注冊(cè)一個(gè)新的節(jié)點(diǎn),序號(hào)又+1增長(zhǎng)

4. 其他程序節(jié)點(diǎn)會(huì)收到時(shí)間通知,則可以去zk上看是否節(jié)點(diǎn)序號(hào)是最小的,最小的才能獲取鎖

偽代碼:

//getConnect()

//registerLock(/lock/app.EPHEMERAL_SEQUENTIAL)

//getLock(){獲取子節(jié)點(diǎn),比較自己的序號(hào)是否是最小的,如果是,則返回鎖獲取成功}

//訪問資源

//釋放鎖releaseLock(){刪除自己的子節(jié)點(diǎn),并創(chuàng)建一個(gè)新的節(jié)點(diǎn)}


//監(jiān)聽器

process(){getLock()}

完整代碼:

public class DistributedCliLock {

????private static final int SESSION_TIMEOUT = 5000;

????private String hosts = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";

????private String groupNode = "locks";

????private String subNode = "sub";

????private boolean haveLock = false;

????private ZooKeeper zk;

????private volatile String thisPath; //record the childNode path of itself


????/**

????*

????* connect to zookeeper

????*/

????public void connectZookeeper() throws Exception{

????????zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){

????????????public void process(WatchedEvent event){

????????????????try{

????????????????????//check event type, only process children Node

????????????????????if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)){

????????????????????????//get child node, and watch parent node

????????????????????????List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

????????????????????????String thisNode = thisPath.substring(("/" + groupNode + "/").length());

????????????????????????//sort and compare the id

????????????????????????Collections.sort(childrenNodes);

????????????????????????if(childrenNodes.indexOf(thisNode) == 0){

????????????????????????????//critical section and release the node

????????????????????????????doSomething();

? ? ? ? ? ? ? ? ? ? ? ? ? ? //re-create a node, id+1

????????????????????????????thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

????????????????????????}

????????????????????}

????????????????}catch(Exception e){e.printStackTrace();}

????????????}

????????});

????????// regist node at the first time

????????thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

????????//wait for a while, in order to see the result

????????Thread.sleep(new Random().nextInt(1000));

????????//get all the sub nodes

????????List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

????????//only one client, you can do something right now

????????if(childrenNodes.size() == 1){

????????????doSomething();

????????????thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

????????}

????}


????//do with the critical section, at the end of the function, release lock

????private void doSomething() throws Exception {

????????try {

????????????out.println("gain lock: " + thisPath);

????????????Thread.sleep(2000);

????????} finally {

????????????out.println("finished: " + thisPath);

????????????zk.delete(this.thisPath, -1);

????????}

????}


????public static void main(String[] args) throws Exception {

????????DistributedCliLock dl = new DistributedCliLock();

????????dl.connectZookeeper();

????????Thread.sleep(Long.MAX_VALUE);

????}

}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、Nginx/keepalived/lvs的介紹 1.nginx 1.1.nginx簡(jiǎn)介 Nginx是一個(gè)自由、...
    Java幫幫閱讀 1,625評(píng)論 0 5
  • ... 一、相關(guān)概念 中間件:為分布式系統(tǒng)提供協(xié)調(diào)服務(wù)的組件,如專門用于計(jì)算服務(wù)的機(jī)器就是一個(gè)計(jì)算型中間件,還有專...
    帥可兒妞閱讀 554評(píng)論 0 0
  • Zookeeper 1. Zookeeper概念簡(jiǎn)介: Zookeeper是一個(gè)分布式協(xié)調(diào)服務(wù);就是為用戶的分布式...
    hahaxiou閱讀 733評(píng)論 3 1
  • 戀愛,是選擇一個(gè)適合自己的人,而不是把一個(gè)不適合自己的人改造成適合自己的模樣。
    66imaginist閱讀 283評(píng)論 0 0
  • 一、構(gòu)造函數(shù) 說構(gòu)造函數(shù)的概念之前我們應(yīng)該先了解類的概念,類與對(duì)象的概念息息相關(guān),我們?cè)谥v對(duì)象的概念的時(shí)候說過,一...
    趁年輕拼一拼閱讀 173評(píng)論 0 0

友情鏈接更多精彩內(nèi)容