zookeeper集群
配置多個(gè)實(shí)例共同構(gòu)成一個(gè)集群對(duì)外提供服務(wù)以達(dá)到水平擴(kuò)展的目的,每個(gè)服務(wù)器上的數(shù)據(jù)是相同的,每一個(gè)服務(wù)器均可以對(duì)外提供讀和寫的服務(wù),這點(diǎn)和redis是相同的,即對(duì)客戶端來講每個(gè)服務(wù)器都是平等的。

這篇主要分析leader的選擇機(jī)制,zookeeper提供了三種方式:
LeaderElection
AuthFastLeaderElection
FastLeaderElection
默認(rèn)的算法是FastLeaderElection,所以這篇主要分析它的選舉機(jī)制。
選擇機(jī)制中的概念
服務(wù)器ID
比如有三臺(tái)服務(wù)器,編號(hào)分別是1,2,3。
編號(hào)越大在選擇算法中的權(quán)重越大。
數(shù)據(jù)ID
服務(wù)器中存放的最大數(shù)據(jù)ID.
值越大說明數(shù)據(jù)越新,在選舉算法中數(shù)據(jù)越新權(quán)重越大。
邏輯時(shí)鐘
或者叫投票的次數(shù),同一輪投票過程中的邏輯時(shí)鐘值是相同的。每投完一次票這個(gè)數(shù)據(jù)就會(huì)增加,然后與接收到的其它服務(wù)器返回的投票信息中的數(shù)值相比,根據(jù)不同的值做出不同的判斷。
選舉狀態(tài)
LOOKING,競(jìng)選狀態(tài)。
FOLLOWING,隨從狀態(tài),同步leader狀態(tài),參與投票。
OBSERVING,觀察狀態(tài),同步leader狀態(tài),不參與投票。
LEADING,領(lǐng)導(dǎo)者狀態(tài)。
選舉消息內(nèi)容
在投票完成后,需要將投票信息發(fā)送給集群中的所有服務(wù)器,它包含如下內(nèi)容。
服務(wù)器ID
數(shù)據(jù)ID
邏輯時(shí)鐘
選舉狀態(tài)
選舉流程圖
因?yàn)槊總€(gè)服務(wù)器都是獨(dú)立的,在啟動(dòng)時(shí)均從初始狀態(tài)開始參與選舉,下面是簡(jiǎn)易流程圖。

選舉狀態(tài)圖
描述Leader選擇過程中的狀態(tài)變化,這是假設(shè)全部實(shí)例中均沒有數(shù)據(jù),假設(shè)服務(wù)器啟動(dòng)順序分別為:A,B,C。

源碼分析
QuorumPeer
主要看這個(gè)類,只有LOOKING狀態(tài)才會(huì)去執(zhí)行選舉算法。每個(gè)服務(wù)器在啟動(dòng)時(shí)都會(huì)選擇自己做為領(lǐng)導(dǎo),然后將投票信息發(fā)送出去,循環(huán)一直到選舉出領(lǐng)導(dǎo)為止。
public void run() {
//.......
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
if (Boolean.getBoolean("readonlymode.enabled")) {
//...
try {
//投票給自己...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
} finally {
//...
}
} else {
try {
//...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
}
}
break;
case OBSERVING:
//...
break;
case FOLLOWING:
//...
break;
case LEADING:
//...
break;
}
}
} finally {
//...
}
}
FastLeaderElection
它是zookeeper默認(rèn)提供的選舉算法,核心方法如下:具體的可以與本文上面的流程圖對(duì)照。
public Vote lookForLeader() throws InterruptedException {
//...
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//給自己投票
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//將投票信息發(fā)送給集群中的每個(gè)服務(wù)器
sendNotifications();
//循環(huán),如果是競(jìng)選狀態(tài)一直到選舉出結(jié)果
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
//沒有收到投票信息
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
//...
}
//收到投票信息
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
switch (n.state) {
case LOOKING:
// 判斷投票是否過時(shí),如果過時(shí)就清除之前已經(jīng)接收到的信息
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
//更新投票信息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//發(fā)送投票信息
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//忽略
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//更新投票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判斷是否投票結(jié)束
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
//忽略
break;
case FOLLOWING:
case LEADING:
//如果是同一輪投票
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判斷是否投票結(jié)束
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//記錄投票已經(jīng)完成
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
//忽略
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
//...
}
}
判斷是否已經(jīng)勝出
默認(rèn)是采用投票數(shù)大于半數(shù)則勝出的邏輯。
選舉流程簡(jiǎn)述
目前有5臺(tái)服務(wù)器,每臺(tái)服務(wù)器均沒有數(shù)據(jù),它們的編號(hào)分別是1,2,3,4,5,按編號(hào)依次啟動(dòng),它們的選擇舉過程如下:
服務(wù)器1啟動(dòng),給自己投票,然后發(fā)投票信息,由于其它機(jī)器還沒有啟動(dòng)所以它收不到反饋信息,服務(wù)器1的狀態(tài)一直屬于Looking。
服務(wù)器2啟動(dòng),給自己投票,同時(shí)與之前啟動(dòng)的服務(wù)器1交換結(jié)果,由于服務(wù)器2的編號(hào)大所以服務(wù)器2勝出,但此時(shí)投票數(shù)沒有大于半數(shù),所以兩個(gè)服務(wù)器的狀態(tài)依然是LOOKING。
服務(wù)器3啟動(dòng),給自己投票,同時(shí)與之前啟動(dòng)的服務(wù)器1,2交換信息,由于服務(wù)器3的編號(hào)最大所以服務(wù)器3勝出,此時(shí)投票數(shù)正好大于半數(shù),所以服務(wù)器3成為領(lǐng)導(dǎo)者,服務(wù)器1,2成為小弟。
服務(wù)器4啟動(dòng),給自己投票,同時(shí)與之前啟動(dòng)的服務(wù)器1,2,3交換信息,盡管服務(wù)器4的編號(hào)大,但之前服務(wù)器3已經(jīng)勝出,所以服務(wù)器4只能成為小弟。
服務(wù)器5啟動(dòng),后面的邏輯同服務(wù)器4成為小弟。