前言:在Hadoop 1.x版本,HDFS集群的NameNode一直存在單點(diǎn)故障問題:集群只存在一個(gè)NameNode節(jié)點(diǎn),它維護(hù)了HDFS所有的元數(shù)據(jù)信息,當(dāng)該節(jié)點(diǎn)所在服務(wù)器宕機(jī)或者服務(wù)不可用,整個(gè)HDFS集群都將處于不可用狀態(tài),極大限制了HDFS在生產(chǎn)環(huán)境的應(yīng)用場(chǎng)景。直到Hadoop 2.0版本才提出了高可用 (High Availability, HA) 解決方案,并且經(jīng)過多個(gè)版本的迭代更新,已經(jīng)廣泛應(yīng)用于生產(chǎn)環(huán)境。
解決方案:在同一個(gè)HDFS集群,運(yùn)行兩個(gè)互為主備的NameNode節(jié)點(diǎn)。一臺(tái)為主Namenode節(jié)點(diǎn),處于Active狀態(tài),一臺(tái)為備NameNode節(jié)點(diǎn),處于Standby狀態(tài)。其中只有Active NameNode對(duì)外提供讀寫服務(wù),Standby NameNode會(huì)根據(jù)Active NameNode的狀態(tài)變化,在必要時(shí)切換成Active狀態(tài)。
【NameNode HA架構(gòu)圖】
ZKFC
ZKFC即ZKFailoverController,作為獨(dú)立進(jìn)程存在,負(fù)責(zé)控制NameNode的主備切換,ZKFC會(huì)監(jiān)測(cè)NameNode的健康狀況,當(dāng)發(fā)現(xiàn)Active NameNode出現(xiàn)異常時(shí)會(huì)通過Zookeeper集群進(jìn)行一次主備選舉,完成Active和Standby狀態(tài)的切換;
HealthMonitor
定時(shí)調(diào)用NameNode的HAServiceProtocol RPC接口(monitorHealth和getServiceStatus),監(jiān)控NameNode的健康狀態(tài)并向ZKFC反饋;
ActiveStandbyElector
接收Z(yǔ)KFC的選舉請(qǐng)求,通過Zookeeper自動(dòng)完成主備選舉,選舉完成后回調(diào)ZKFC的主備切換方法對(duì)NameNode進(jìn)行Active和Standby狀態(tài)的切換;
JouranlNode集群
共享存儲(chǔ)系統(tǒng),負(fù)責(zé)存儲(chǔ)HDFS的元數(shù)據(jù),Active NameNode(寫入)和Standby NameNode(讀取)通過共享存儲(chǔ)系統(tǒng)實(shí)現(xiàn)元數(shù)據(jù)同步,在主備切換過程中,新的Active NameNode必須確保元數(shù)據(jù)同步完成才能對(duì)外提供服務(wù);
【ZKFC工作原理】
ZKFailoverController在啟動(dòng)時(shí)同時(shí)會(huì)初始化HealthMonitor和ActiveStandbyElector服務(wù),同時(shí)也會(huì)向HealthMonitor和ActiveStandbyElector注冊(cè)相應(yīng)的回調(diào)方法:
private int doRun(String[] args) throws Exception {
try {
initZK(); //初始化ActiveStandbyElector服務(wù)
} catch (KeeperException ke) {
LOG.error("Unable to start failover controller. Unable to connect "
+ "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
+ "configured value for " + ZK_QUORUM_KEY + " and ensure that "
+ "ZooKeeper is running.", ke);
return ERR_CODE_NO_ZK;
}
......
try {
initRPC();
initHM(); //初始化HealthMonitor服務(wù)
startRPC();
mainLoop();
} catch (Exception e) {
LOG.error("The failover controller encounters runtime error: ", e);
throw e;
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}
一. 狀態(tài)監(jiān)控
HealthMonitor檢測(cè)NameNode的兩類狀態(tài),HealthMonitor.State和HealthMonitor.HAServiceStatus。在程序上啟動(dòng)一個(gè)線程循環(huán)調(diào)用NameNode的HAServiceProtocol RPC接口的方法來檢測(cè)NameNode 的狀態(tài),并將狀態(tài)的變化通過回調(diào)的方式來通知ZKFailoverController。
HealthMonitor.State包括:
INITIALIZING:The health monitor is still starting up;
SERVICE_NOT_RESPONDING:The service is not responding to health check RPCs;
SERVICE_HEALTHY:The service is connected and healthy;
SERVICE_UNHEALTHY:The service is running but unhealthy;
HEALTH_MONITOR_FAILED:The health monitor itself failed unrecoverably and can no longer provide accurate information;
HealthMonitor.HAServiceStatus包括:
INITIALIZING:NameNode正在啟動(dòng)中;
ACTIVE:當(dāng)前NameNode角色為Active;
STANDBY:當(dāng)前NameNode角色為Standby;
STOPPING:NameNode已經(jīng)停止運(yùn)行;
當(dāng)HealthMonitor檢測(cè)到NameNode的健康狀態(tài)或角色狀態(tài)發(fā)生變化時(shí),ZKFC會(huì)根據(jù)狀態(tài)的變化決定是否需要進(jìn)行主備選舉。
二. 主備選舉
HealthMonitor.State狀態(tài)變化導(dǎo)致的不同后續(xù)措施:
/**
* Check the current state of the service, and join the election
* if it should be in the election.
*/
private void recheckElectability() {
// Maintain lock ordering of elector -> ZKFC
synchronized (elector) {
synchronized (this) {
boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
long remainingDelay = delayJoiningUntilNanotime - System.nanoTime();
if (remainingDelay > 0) {
if (healthy) {
LOG.info("Would have joined master election, but this node is " +
"prohibited from doing so for " +
TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
}
scheduleRecheck(remainingDelay);
return;
}
switch (lastHealthState) {
case SERVICE_HEALTHY:
//調(diào)用ActiveStandbyElector的joinElection發(fā)起一次主備選舉;
elector.joinElection(targetToData(localTarget));
if (quitElectionOnBadState) {
quitElectionOnBadState = false;
}
break;
case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
//調(diào)用ActiveStandbyElector的quitElection(false)從ZK上刪除已經(jīng)建立的臨時(shí)節(jié)點(diǎn)退出主備選舉,不進(jìn)行隔離;
elector.quitElection(false);
serviceState = HAServiceState.INITIALIZING;
break;
case SERVICE_UNHEALTHY:
case SERVICE_NOT_RESPONDING:
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
//調(diào)用ActiveStandbyElector的quitElection(true)從ZK上刪除已經(jīng)建立的臨時(shí)節(jié)點(diǎn)退出主備選舉,并進(jìn)行隔離;
elector.quitElection(true);
serviceState = HAServiceState.INITIALIZING;
break;
case HEALTH_MONITOR_FAILED:
fatalError("Health monitor failed!");
break;
default:
throw new IllegalArgumentException("Unhandled state:"
+ lastHealthState);
}
}
}
}
HAServiceStatus在狀態(tài)檢測(cè)之中僅起輔助的作用,當(dāng)HAServiceStatus發(fā)生變化時(shí),ZKFC會(huì)判斷NameNode返回的HAServiceStatus與ZKFC所期望的是否相同,如果不相同,ZKFC會(huì)調(diào)用ActiveStandbyElector的quitElection方法刪除當(dāng)前已經(jīng)在ZK上建立的臨時(shí)節(jié)點(diǎn)退出主備選舉。
void verifyChangedServiceState(HAServiceState changedState) {
synchronized (elector) {
synchronized (this) {
if (serviceState == HAServiceState.INITIALIZING) {
if (quitElectionOnBadState) {
LOG.debug("rechecking for electability from bad state");
recheckElectability();
}
return;
}
if (changedState == serviceState) {
serviceStateMismatchCount = 0;
return;
}
if (serviceStateMismatchCount == 0) {
// recheck one more time. As this might be due to parallel transition.
serviceStateMismatchCount++;
return;
}
// quit the election as the expected state and reported state
// mismatches.
LOG.error("Local service " + localTarget
+ " has changed the serviceState to " + changedState
+ ". Expected was " + serviceState
+ ". Quitting election marking fencing necessary.");
delayJoiningUntilNanotime = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(1000);
elector.quitElection(true);
quitElectionOnBadState = true;
serviceStateMismatchCount = 0;
serviceState = HAServiceState.INITIALIZING;
}
}
}
三. 主備選舉
ZKFC通過ActiveStandbyElector的joinElection方法發(fā)起NameNode的主備選舉,這個(gè)過程通過Zookeeper的寫一致性和臨時(shí)節(jié)點(diǎn)機(jī)制實(shí)現(xiàn):
a. 當(dāng)發(fā)起一次主備選舉時(shí),Zookeeper會(huì)嘗試創(chuàng)建臨時(shí)節(jié)點(diǎn)/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock,Zookeeper的寫一致性保證最終只會(huì)有一個(gè)ActiveStandbyElector創(chuàng)建成功,創(chuàng)建成功的 ActiveStandbyElector對(duì)應(yīng)的NameNode就會(huì)成為主NameNode,ActiveStandbyElector回調(diào)ZKFC的方法將對(duì)應(yīng)的NameNode切換為Active狀態(tài)。而創(chuàng)建失敗的ActiveStandbyElector對(duì)應(yīng)的NameNode成為備NameNode,ActiveStandbyElector回調(diào)ZKFC的方法將對(duì)應(yīng)的NameNode切換為Standby狀態(tài);
private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}
createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync(); //創(chuàng)建臨時(shí)節(jié)點(diǎn)
}
b.不管是否選舉成功,所有ActiveStandbyElector都會(huì)向Zookeeper注冊(cè)一個(gè)Watcher來監(jiān)聽這個(gè)節(jié)點(diǎn)的狀態(tài)變化事件;
private void monitorLockNodeAsync() {
if (monitorLockNodePending && monitorLockNodeClient == zkClient) {
LOG.info("Ignore duplicate monitor lock-node request.");
return;
}
monitorLockNodePending = true;
monitorLockNodeClient = zkClient;
zkClient.exists(zkLockFilePath, watcher, this, zkClient); //向zookeeper注冊(cè)Watcher監(jiān)聽器
}
c.如果Active NameNode對(duì)應(yīng)的HealthMonitor檢測(cè)到NameNode狀態(tài)異常時(shí),ZKFC會(huì)刪除在Zookeeper上創(chuàng)建的臨時(shí)節(jié)點(diǎn)ActiveStandbyElectorLock,這樣處于Standby NameNode的ActiveStandbyElector注冊(cè)的Watcher就會(huì)收到這個(gè)節(jié)點(diǎn)的 NodeDeleted事件。收到這個(gè)事件后,會(huì)馬上再次創(chuàng)建ActiveStandbyElectorLock,如果創(chuàng)建成功,則Standby NameNode被選舉為Active NameNode。
【防止腦裂】
在分布式系統(tǒng)中腦裂又稱為雙主現(xiàn)象,由于Zookeeper的“假死”,長(zhǎng)時(shí)間的垃圾回收或其它原因都可能導(dǎo)致雙Active NameNode現(xiàn)象,此時(shí)兩個(gè)NameNode都可以對(duì)外提供服務(wù),無(wú)法保證數(shù)據(jù)一致性。對(duì)于生產(chǎn)環(huán)境,這種情況的出現(xiàn)是毀滅性的,必須通過自帶的隔離(Fencing)機(jī)制預(yù)防這種現(xiàn)象的出現(xiàn)。
ActiveStandbyElector為了實(shí)現(xiàn)fencing隔離機(jī)制,在成功創(chuàng)建hadoop-ha/dfs.nameservices/ActiveStandbyElectorLock臨時(shí)節(jié)點(diǎn)后,會(huì)創(chuàng)建另外一個(gè)/hadoop?ha/{dfs.nameservices}/ActiveBreadCrumb持久節(jié)點(diǎn),這個(gè)持久節(jié)點(diǎn)保存了Active NameNode的地址信息。當(dāng)Active NameNode在正常的狀態(tài)下斷開Zookeeper Session (注意由于/hadoop-ha/dfs.nameservices/ActiveStandbyElectorLock是臨時(shí)節(jié)點(diǎn),也會(huì)隨之刪除),會(huì)一起刪除持久節(jié)點(diǎn)/hadoop?ha/{dfs.nameservices}/ActiveBreadCrumb。但是如果ActiveStandbyElector在異常的狀態(tài)下關(guān)閉Zookeeper Session,那么由于/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb是持久節(jié)點(diǎn),會(huì)一直保留下來。當(dāng)另一個(gè)NameNode(standy => active)選主成功之后,會(huì)注意到上一個(gè)Active NameNode遺留下來的ActiveBreadCrumb節(jié)點(diǎn),從而會(huì)回調(diào)ZKFailoverController的方法對(duì)舊的Active NameNode進(jìn)行fencing。
① 首先ZKFC會(huì)嘗試調(diào)用舊Active NameNode的HAServiceProtocol RPC接口的transitionToStandby方法,看能否將狀態(tài)切換為Standby;
② 如果調(diào)用transitionToStandby方法切換狀態(tài)失敗,那么就需要執(zhí)行Hadoop自帶的隔離措施,Hadoop目前主要提供兩種隔離措施:
sshfence:SSH to the Active NameNode and kill the process;
shellfence:run an arbitrary shell command to fence the Active NameNode;
只有在成功地執(zhí)行完成fencing之后,選主成功的ActiveStandbyElector才會(huì)回調(diào)ZKFC的becomeActive方法將對(duì)應(yīng)的NameNode切換為Active,開始對(duì)外提供服務(wù)。
private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
Stat oldBreadcrumbStat = fenceOldActive(); //隔離old active NameNode
writeBreadCrumbNode(oldBreadcrumbStat); //更新ActiveBreadCrumb保存的active NameNode地址信息
if (LOG.isDebugEnabled()) {
LOG.debug("Becoming active for " + this);
}
appClient.becomeActive(); //選主成功的ActiveStandbyElector切換NameNode狀態(tài)
state = State.ACTIVE;
return true;
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
// Caller will handle quitting and rejoining the election.
return false;
}
}