本文基于hbase-1.3.0源碼
1. 前言
寫(xiě)在前面,hbase有兩個(gè)地方有Procedure:
- 一個(gè)Procedure的類(lèi)的路徑是
org.apache.hadoop.hbase.procedure2.Procedure。 這是一個(gè)抽象類(lèi),它的一系列實(shí)現(xiàn)類(lèi)。諸如DeleteTableProcedure,DeleteColumnFamilyProcedure等等,HMaster將對(duì)table的元數(shù)據(jù)修改作為一個(gè)Procedure來(lái)實(shí)現(xiàn),一個(gè)Procedure又涉及到若干個(gè)可以嵌套的Subprocedure,一個(gè)procedure的成功一定是在所有subprocedure成功后才算成功,某個(gè)subprocedure失敗則需要回滾整個(gè)過(guò)程,是不是有點(diǎn)事務(wù)的特征。 - 一個(gè)Procedure的類(lèi)的路徑是
org.apache.hadoop.hbase.procedure.Procedure,這個(gè)Procedure是一個(gè)分布式的實(shí)現(xiàn),一個(gè)Procedure同樣有一些Subprocedure,而這些Subprocedure的執(zhí)行則分布在多個(gè)HRegionServer上,和1中一樣只有所有subprocedure成功執(zhí)行后一個(gè)procedure才能成功,在其中某一個(gè)subprocedure失敗后,需要有一種機(jī)制去通知其他成功執(zhí)行的subprocedure,這些成功執(zhí)行subprocedure根據(jù)需要可能需要執(zhí)行回退操作。
本文講的是2中procedure。
2. Procedure設(shè)計(jì)原理
不清楚該怎么翻譯procedure比較好,我的理解是它有點(diǎn)像存儲(chǔ)過(guò)程。實(shí)際上用戶(hù)也可以像使用存儲(chǔ)過(guò)程一樣自定義Procedure讓Hbase去加載,然后在Hbase啟動(dòng)后通過(guò)rpc調(diào)用Procedure,并且可以獲得Procedure執(zhí)行的結(jié)果。
「前言」2 中提到Procedure的執(zhí)行過(guò)程中,會(huì)產(chǎn)生多個(gè)subprocedure分布到不同的HRegionServer上執(zhí)行,是完全的分布式執(zhí)行。分布式執(zhí)行也必然會(huì)面對(duì)諸如網(wǎng)絡(luò)分區(qū),部分執(zhí)行成功,以及全局狀態(tài)協(xié)調(diào)等等處理不好會(huì)導(dǎo)致一致性的問(wèn)題。
Procedure的實(shí)現(xiàn)有點(diǎn)類(lèi)似兩階段提交(2 Phase Commit,下面簡(jiǎn)稱(chēng)2PC), 2PC是實(shí)現(xiàn)分布式事務(wù)的一種經(jīng)典方法,關(guān)于2PC可以參考一下2PC, 這里簡(jiǎn)單介紹:
2PC中存在兩種角色:協(xié)調(diào)者和參與者(對(duì)應(yīng)到本文Procedure和Subprocedure)。2PC分為兩個(gè)階段:
1. 第一階段為prepare階段,協(xié)調(diào)者發(fā)起提議(proposal),詢(xún)問(wèn)所有參與者是否接收提議,
參與者接收到提議后根據(jù)自身情況決定恢復(fù)協(xié)調(diào)者肯定或者否定的答復(fù)。
2. 第二階段,協(xié)調(diào)者等待所有參與者的回復(fù),如果收到所有參與者肯定的答復(fù),就會(huì)通知所有參與者提交之前發(fā)起的提議。
否者不論是收到否定的答復(fù)或者等待超時(shí),都同志所有參與者終止提議。
2PC在協(xié)調(diào)者不出現(xiàn)問(wèn)題的情況下可以保證最終一致性。但是協(xié)調(diào)者在prepare階段發(fā)起提議后奔潰,沒(méi)有新的協(xié)調(diào)者替換導(dǎo)致所有參與者阻塞。
又或者在第二階段協(xié)調(diào)者通知第一個(gè)參與者提交后協(xié)調(diào)者和收到提交請(qǐng)求的參與者都奔潰,新的協(xié)調(diào)者起來(lái)后無(wú)從判斷上次提議狀態(tài),從而都會(huì)出現(xiàn)不一致。
HBase里Procedure通過(guò)zookeeper解決了這些問(wèn)題。
3. 設(shè)計(jì)細(xì)節(jié)
3.1 基本概念
「2」中提到procedure是分布式執(zhí)行的,并且實(shí)現(xiàn)了2PC,有協(xié)調(diào)者和參與者兩種角色,兩種角色分別運(yùn)行在HMaster和HRegionServer上,下文會(huì)分別講HMaster和HRegionServer上不同角色的實(shí)現(xiàn),這其中又會(huì)涉及到以下的一些類(lèi):
1. HMaster上運(yùn)行
-
MasterProcedureManagerHost
名字是XXXHost的一般都是提供運(yùn)行環(huán)境,MasterProcedureManagerHost在HMaster初始化過(guò)程中創(chuàng)建:- 調(diào)用loadProcedures然后它會(huì)默認(rèn)加載
SnapshotManager和MasterFlushTableProcedureManager這兩個(gè)procedure管理類(lèi)(這兩個(gè)管理類(lèi)一個(gè)負(fù)責(zé)管理snapshot procedure,一個(gè)管理flush table的procedure)。
除了默認(rèn)加載以外,還會(huì)加載xml配置項(xiàng)hbase.procedure.master.classes中用戶(hù)自定義的XXXManager。 - 加載好這些manager類(lèi)之后,就是調(diào)用它們的initialize方法完成初始化了。
- 調(diào)用loadProcedures然后它會(huì)默認(rèn)加載
-
MasterProcedureManager
這是一個(gè)抽象類(lèi),繼承自抽象類(lèi)ProcedureManager。它的實(shí)現(xiàn)類(lèi)比如上面提到的SnapshotManager,運(yùn)行在HMaster上,由MasterProcedureManagerHost加載,這些manager負(fù)責(zé)啟動(dòng)并管理同一種類(lèi)的Procedure,因?yàn)橐粋€(gè)Procedure可以被客戶(hù)端調(diào)用多次。
用戶(hù)如果實(shí)現(xiàn)自己的XXXManager,那么提供給HMaster運(yùn)行的就需要實(shí)現(xiàn)MasterProcedureManager,它有如下的抽象方法需要實(shí)現(xiàn):1. public abstract String getProcedureSignature(); 它應(yīng)當(dāng)返回一個(gè)唯一值,客戶(hù)端調(diào)用Procedure是通過(guò)對(duì)應(yīng)的Manager完成的,這就需要通過(guò)這個(gè)signature識(shí)別manager。 2. public abstract void initialize(MasterServices master, MetricsMaster metricsMaster) MasterProcedureManagerHost加載好這個(gè)類(lèi)并實(shí)例化類(lèi)對(duì)象后會(huì)調(diào)用這個(gè)方法完成初始化工作 3. public void execProcedure(ProcedureDescription desc) ; 客戶(hù)端通過(guò)rpc有兩種rpc方法去調(diào)用一個(gè)procedure,這是其中一種,沒(méi)有返回值,參數(shù)類(lèi)型ProcedureDescription定在protobuf文件中,下面是它的定義: message ProcedureDescription { required string signature = 1; xxxManager的getProcedureSignature返回值 optional string instance = 2; 相當(dāng)于給每次調(diào)用的procedure取一個(gè)名字 optional int64 creation_time = 3 [default = 0]; repeated NameStringPair configuration = 4; name:value形式的參數(shù) } HBase只是替我們實(shí)現(xiàn)了2PC框架,提供一些幫助類(lèi),至于調(diào)用的procedure具體做什么用戶(hù)應(yīng)該自己在exeProcedure中實(shí)現(xiàn)。 -
Procedure(
org.apache.hadoop.hbase.procedure.Procedure)
上面提到用戶(hù)自定義的procedure,在HMaster端需要繼承MasterProcedureManager,然后在execProcedure中去實(shí)現(xiàn)核心邏輯,一個(gè)完整的procedure還需要分布在hregionServer上的Subprocedure們配合。HBase提供了2pc實(shí)現(xiàn)然用戶(hù)能夠使用它以使得一個(gè)procedure中各個(gè)subprocedure配合完成,類(lèi)Procedure,我的理解就是HBase提供2PC實(shí)現(xiàn)的核心,下面是一段這個(gè)類(lèi)介紹注釋?zhuān)?/p>This class encapsulates state and methods for tracking and managing a distributed procedure, as well as aborting if any member encounters a problem or if a cancellation is requested
Procedure有2個(gè)重要方法:
- sendGlobalBarrierStart
相當(dāng)于2PC prepare階段,調(diào)用這個(gè)方法等待所有參與者(Subprocedure)完成prepare。 - sendGlobalBarrierReached
相當(dāng)于2PC的commit階段,調(diào)用這個(gè)方法等待所有參與者(Subprocedure)commit完成。
關(guān)于Procedure的具體實(shí)現(xiàn)原理下文會(huì)介紹。
- sendGlobalBarrierStart
2. HRegionServer上運(yùn)行
HRegionServer上基本都會(huì)有一個(gè)與HMaster相對(duì)應(yīng)的類(lèi)實(shí)現(xiàn),一個(gè)procedure的全部過(guò)程是需要RegionServer和HMaster配合完成的。
-
RegionServerProcedureManagerHost
對(duì)應(yīng)MasterProcedureManagerHost,它在HRegionServer啟動(dòng)過(guò)程中完成創(chuàng)建和初始化:- 調(diào)用loadProcedures,加載用戶(hù)配置項(xiàng)
hbase.procedure.regionserver.classes定義的procedureManager,運(yùn)行在HRegionServer上的procedureManager必須要繼承RegionServerProcedureManager, 這些XXXManager和HMaster上的XXXManager對(duì)應(yīng),相當(dāng)于管理procedure中的subpProcedure(參與者)。
處理用戶(hù)配置的以外,還會(huì)默認(rèn)加載RegionServerSnapshotManager和RegionServerFlushTableProcedureManager,對(duì)應(yīng)HMaster上默認(rèn)加載的SnapshotManager和MasterFlushTableProcedureManager. - 調(diào)用initialize,方法中調(diào)用每一個(gè)加載的XXXManager的initialize方法完成初始化。
- 調(diào)用start, start方法中調(diào)用每一個(gè)XXXManager的start方法。
- 調(diào)用loadProcedures,加載用戶(hù)配置項(xiàng)
-
RegionServerProcedureManager
對(duì)應(yīng)MasterProcedureManager,同樣是一個(gè)抽象類(lèi),同樣繼承自ProcedureManager??蛻?hù)端調(diào)用procedure時(shí),總是會(huì)先走到MasterProcedureManager的具體實(shí)現(xiàn)類(lèi)上執(zhí)行execProcedure,在execProcedure的實(shí)現(xiàn)邏輯里使用Procedure這個(gè)類(lèi)提供的方法完成協(xié)調(diào)者的工作。接著就是參與者RegionServerProcedureManager的工作:接受協(xié)調(diào)者發(fā)送的兩個(gè)階段的請(qǐng)求,處理請(qǐng)求,反饋給協(xié)調(diào)者。RegionServerProcedureManager有兩個(gè)主要的抽象方法需要實(shí)現(xiàn):public abstract void initialize(RegionServerServices rss) throws KeeperException; public abstract void start(); 這兩個(gè)方法正是RegionServerProcedureManagerHost的initialize和start時(shí)會(huì)調(diào)用的。 由于在MasterProcedureManager的execProcedure中可以使用Procedure這個(gè)類(lèi)提供的方法完成兩個(gè)階段的協(xié)調(diào)。那么在start方法中,我們應(yīng)該可以去接受協(xié)調(diào)者的兩個(gè)階段請(qǐng)求,然后做出處理。 -
Subprocedure
這是一個(gè)抽象類(lèi),同時(shí)它繼承Callable接口,參與者的重要邏輯應(yīng)該繼承Subprocedure,并實(shí)現(xiàn)它的三個(gè)主要方法:1. abstract public void acquireBarrier() throws ForeignException; prepare階段參與者接收到協(xié)調(diào)者prepare請(qǐng)求后需要在這個(gè)方法里完成主要邏輯。拋出異常意味著否定的答復(fù)。 2. abstract public byte[] insideBarrier() throws ForeignException; 第二階段,意味著所有參與者(也就是運(yùn)行在其他RegionServer上的Subprocdure完成了prepare階段acquireBarrier調(diào)用),進(jìn)入提交階段。 這是一個(gè)有返回值的方法,它的返回值會(huì)傳遞給協(xié)調(diào)者,也就是Procedure。 3. abstract public void cleanup(Exception e); 以上兩個(gè)階段出現(xiàn)異常,都會(huì)調(diào)用這個(gè)方法完成本地的一些清理操作:資源釋放,已完成操作的回滾等等。 以上方法都在call()方法中完成,作為用戶(hù)來(lái)說(shuō)不需要關(guān)心,如何接受協(xié)調(diào)者 的請(qǐng)求,以及回復(fù)協(xié)調(diào)者。call方法中完成了這些調(diào)用。 call 方法有以下調(diào)用: -> acquireBarrier -> rpcs.sendMemberAcquired(this); 參與者答復(fù)協(xié)調(diào)者prepare肯定請(qǐng)求。 -> waitForReachedGlobalBarrier();等待所有參與者完成prepare階段 -> insideBarrier -> rpcs.sendMemberCompleted();通知協(xié)調(diào)者,當(dāng)前參與者完成第二階段 commit 以上過(guò)程出現(xiàn)異常,都會(huì)調(diào)用cancel通知協(xié)調(diào)者取消本次procedure,再調(diào)用cleanup完成本地清理工作。 ProcedureMember
ProcedureMemeber負(fù)責(zé)創(chuàng)建、運(yùn)行、管理subprocedure。
3.2 細(xì)節(jié) - 基于zookeeper的實(shí)現(xiàn)
盡管「3.1」中提到了master端(協(xié)調(diào)者),使用Procedure這個(gè)類(lèi)來(lái)協(xié)調(diào)兩個(gè)階段。regionserver端(參與者)在Subprocedure完成具體邏輯, 使用ProcedureMember來(lái)管理Subprocedure,但沒(méi)有涉及2PC的實(shí)現(xiàn)。
HBase目前提供了基于Zookeeper實(shí)現(xiàn)的2PC, 當(dāng)然由于運(yùn)行在HMaster和HRegionServer上的XXXProcedureMananger都完全是有用戶(hù)自己實(shí)現(xiàn),用戶(hù)也可以基于其他途徑實(shí)現(xiàn)2PC, 然后替換掉Procedure類(lèi)中的zk的實(shí)現(xiàn)。
下面主要講一下基于zookeeper的實(shí)現(xiàn),同樣分為HMaster和HRegionServer量方面。
3.2.1 HMaster端實(shí)現(xiàn)
1. Procedure
由于前文提到XXXProcedureManager主要使用Procedure類(lèi)去實(shí)現(xiàn)兩個(gè)階段的協(xié)調(diào),先看看Procedure類(lèi)。下面是Procedure的核心成員:
1. final private String procName;
每創(chuàng)建一個(gè)新的procedure都應(yīng)該有一個(gè)不同的名稱(chēng),客戶(hù)端rpc調(diào)用procedure時(shí)可以指定,參考前面說(shuō)的protobuf定義文件ProcedureDescription的instance成員。
2.final private byte[] args;
args參數(shù)是在prepare請(qǐng)求階段發(fā)送給參與者的,所有的參與者都會(huì)收到參數(shù)。
3. final CountDownLatch acquiredBarrierLatch;
final CountDownLatch releasedBarrierLatch;
final CountDownLatch completedLatch;
這三個(gè)計(jì)數(shù)同步器,分別用來(lái)等待所有參與者完成prepare,完成commit,等待procedure結(jié)束。
4. private final List<String> acquiringMembers;
private final List<String> inBarrierMembers;
private final HashMap<String, byte[]> dataFromFinishedMembers;
acquiringMembers中保存進(jìn)入prepare階段的參與者;
inBarrierMembers完成prepare需要進(jìn)入commit階段的參與者;
dataFromFinishedMembers保存完成commit后的參與者返回的數(shù)據(jù),參考前文Subprocedure # insideBarrier方法返回值。
5. private ProcedureCoordinator coord;
看類(lèi)的名字也知道Procedure使用它來(lái)完成真正的兩個(gè)階段的協(xié)調(diào)工作.
它還負(fù)責(zé)Procedure的創(chuàng)建以及運(yùn)行。下文重點(diǎn)講一下ProcedureCoordinator。
2. ProcedureCoordinator
ProcedureCoordinator負(fù)責(zé)創(chuàng)建Procedure,并且提交Procedure到線(xiàn)程池運(yùn)行,反過(guò)來(lái)Procedure又調(diào)用ProcedureCoordinator去完成所有subprocedure的兩個(gè)階段的協(xié)調(diào)。
上文提到Procedure的兩個(gè)主要方法:
1. sendGlobalBarrierStart -- 請(qǐng)求所有參與者prepare
2. sendGlobalBarrierReached -- 請(qǐng)求所有參與者commit
這兩個(gè)方法里面分別調(diào)用了:
1. coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
- 通知所有參與者也就是this.acquiringMembers 進(jìn)入prepare階段
2. coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
- 所有參與者完成prepare,通知它們進(jìn)入commit階段。
coord.getRpcs返回ProcedureCoordinatorRpcs,這是一個(gè)接口,顯然它是委派它來(lái)通知所有參與者進(jìn)入prepare,再進(jìn)入commit階段。
HBase提供了ProcedureCoordinatorRpcs一種基于Zookeeper的實(shí)現(xiàn):ZKProcedureCoordinatorRpcs 。
而且現(xiàn)在運(yùn)行在HMaster上的各種XXXProcedureManager都使用這種基于zookeeper的實(shí)現(xiàn)。
3. ZKProcedureCoordinatorRpcs
它實(shí)現(xiàn)了接口ProcedureCoordinatorRpcs,有如下核心方法:
1. boolean start(final ProcedureCoordinator listener);
- ProcedureCoordinator在構(gòu)造函數(shù)中調(diào)用start
1. sendGlobalBarrierAcquire(Procedure procName, byte[] info, List<String> members)
- 通知進(jìn)入prepare階段, info是傳遞給各個(gè)參與者的信息,members即參與者名稱(chēng)。
2. sendGlobalBarrierReached(Procedure procName, List<String> members)
- 通知進(jìn)入commit階段
3. sendAbortToMembers(Procedure procName, ForeignException cause)
- 參與者失敗時(shí)調(diào)用,通知自身無(wú)法完成本次procedure。
4. resetMembers(Procedure procName)
- 完成procedure后調(diào)用,通知參與者重置自身的狀態(tài)
下面說(shuō)說(shuō)ZKProcedureCoordinatorRpcs怎么基于zk實(shí)現(xiàn)了這些方法:
它有如下成員:
ZooKeeperWatcher watcher; -- zookeeper訪(fǎng)問(wèn)客戶(hù)端
String procedureType; -- HMaster上的XXXProcedureManager創(chuàng)建ZKProcedureCoordinatorRpcs一般使用自身getProcedureSignature返回值初始化這個(gè)域,這個(gè)字段需要保持唯一
String coordName;
-- 不是很重要,一般就是當(dāng)前運(yùn)行的server名稱(chēng)。
ProcedureCoordinator coordinator;
-- ProcedureCoordinator實(shí)例,也就是它們ProcedureCoordinator和ZKProcedureCoordinatorRpcs互相持有對(duì)方。
- start方法
final public boolean start(final ProcedureCoordinator coordinator) {
if (this.coordinator != null) {
throw new IllegalStateException(
"ZKProcedureCoordinator already started and already has listener installed");
}
this.coordinator = coordinator;
try {
/**
假設(shè)procedureType是pt, ZKProcedureUtil構(gòu)造函數(shù)會(huì)在zk上創(chuàng)建下面三個(gè)node:
1. acquiredZnode : /hbase/pt/acquired
2. reachedZnode: /hbase/pt/reached
3. abortZnode: /hbase/pt/abort
所以基于zk的實(shí)現(xiàn)的2PC是通過(guò)node的改變來(lái)通知所有參與者當(dāng)前處在哪一個(gè)階段。
*/
this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
@Override
// nodeCreated方法會(huì)在zk上新的node創(chuàng)建時(shí)回調(diào)
public void nodeCreated(String path) {
//判斷一下node的路徑是不是/hbase/pt,不是的話(huà)表示node改變和當(dāng)前procedure沒(méi)關(guān)系
if (!isInProcedurePath(path)) return;
LOG.debug("Node created: " + path);
logZKTree(this.baseZNode);
// 判斷新建的node是不是符合路徑/hbase/pt/acquired/{procedureName}/{memberName}.
// 關(guān)于procedureName,每一個(gè)procedure調(diào)用都會(huì)有一個(gè)新的名稱(chēng). memberName即參與者名稱(chēng)。
// 這個(gè)路徑的創(chuàng)建表明有一個(gè)參與者完成prepare階段。后面講到參與者部分時(shí)會(huì)提到參與者完成prepare后會(huì)創(chuàng)建這個(gè)路徑。
if (isAcquiredPathNode(path)) {
// 通知一下一個(gè)參與者完成prepare,畢竟Procedure 還阻塞在acquiredBarrierLatch上等待參與者都完成prepare階段。
coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
ZKUtil.getNodeName(path));
// 和上面的if類(lèi)似,判斷新建的node符不符合/hbase/pt/reached/{procedureName}/{memberName}. 符合表明一個(gè)參與者完成commit階段。
} else if (isReachedPathNode(path)) {
String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
String member = ZKUtil.getNodeName(path);
// get the data from the procedure member
try {
/**
前面說(shuō)到Subprocedure # insideBarrier會(huì)有返回值,這個(gè)返回值被設(shè)置成node的data,此處解析出返回值。
*/
byte[] dataFromMember = ZKUtil.getData(watcher, path);
// ProtobufUtil.isPBMagicPrefix will check null
if (dataFromMember != null && dataFromMember.length > 0) {
if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
ForeignException ee = new ForeignException(coordName,
"Failed to get data from finished node or data is illegally formatted:"
+ path);
coordinator.abortProcedure(procName, ee);
} else {
dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
dataFromMember.length);
LOG.debug("Finished data from procedure '" + procName
+ "' member '" + member + "': " + new String(dataFromMember));
/**
通知一個(gè)參與者完成commit,此時(shí)Procedure阻塞在releasedBarrierLatch等待所有參與者完成commit。
*/
coordinator.memberFinishedBarrier(procName, member, dataFromMember);
}
} else {
coordinator.memberFinishedBarrier(procName, member, dataFromMember);
}
} catch (KeeperException e) {
ForeignException ee = new ForeignException(coordName, e);
coordinator.abortProcedure(procName, ee);
} catch (InterruptedException e) {
ForeignException ee = new ForeignException(coordName, e);
coordinator.abortProcedure(procName, ee);
}
} else if (isAbortPathNode(path)) {
abort(path);
} else {
LOG.debug("Ignoring created notification for node:" + path);
}
}
};
zkProc.clearChildZNodes();
} catch (KeeperException e) {
...
}
...
}
- sendGlobalBarrierAcquire 通知參與者prepare
// 1. info 是傳遞給進(jìn)入prepare的subprocedure的
// 2. nodeNames 即所有參與者,
final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
throws IOException, IllegalArgumentException {
String procName = proc.getName();
String abortNode = zkProc.getAbortZNode(procName);
try {
// 檢查/hbase/pt/abort/{procedureName}是否存在,存在則表明放棄procedureName這個(gè)procedure。
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
abort(abortNode);
}
} catch (KeeperException e) {
...
}
// node: /hbase/pt/acquired/{procedureName}
String acquire = zkProc.getAcquiredBarrierNode(procName);
try {
byte[] data = ProtobufUtil.prependPBMagic(info);
// 在zk上創(chuàng)建/hbase/pt/acquired/{procedureName}節(jié)點(diǎn)
// 后面在說(shuō)subprocedure會(huì)提到,subprocedure會(huì)監(jiān)控這個(gè)階段判斷是不是一個(gè)新的procedure啟動(dòng)了,一旦監(jiān)控到這個(gè)節(jié)點(diǎn)參與者就進(jìn)入prepare階段
ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
for (String node : nodeNames) {
// znode : /hbase/pt/acquired/{procedureName}/{memberName}
String znode = ZKUtil.joinZNode(acquire, node);
// 檢查/hbase/pt/acquired/{procedureName}/{memberName}是否存在,一旦存在表明memberName這個(gè)參與者完成prepare。
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
//通知一個(gè)member完成prepare
coordinator.memberAcquiredBarrier(procName, node);
}
}
} catch (KeeperException e) {
...
}
}
-
sendGlobalBarrierReached
這里不再貼代碼,它和sendGlobalBarrierAcquire工作過(guò)程差不多:- 創(chuàng)建
/hbase/pt/reached/{procedureName}這個(gè)node,所有監(jiān)控這個(gè)node的參與者因此進(jìn)入commit階段。 - 對(duì)所有的member, 檢查
/hbase/pt/reached/{procedureName}/{memberName}是否存在,存在表明memberName這個(gè)參與者完成commit。
- 創(chuàng)建
-
總結(jié)
基于zk的實(shí)現(xiàn), zk上會(huì)存在如下node:/hbase/{procedureSignature} | - /required |- /{procedureName} | - /{memberName-1} ... | - /{memberName-n} | - /reached |- /{procedureName} | - /{memberName-1} ... | - /{memberName-n} | -/abort | - /{procedureName} 1. 協(xié)調(diào)者創(chuàng)建/required/{procedureName} 2. 參與者監(jiān)聽(tīng)到/required/{procedureName}的存在后,進(jìn)入prepare階段 3. 參與者memberName-i調(diào)用Subprocedure # acquireBarrier 4. 參與者memberName-i完成prepare階段,創(chuàng)建/required/{procedureName}/{memberName-i}這個(gè)node通知協(xié)調(diào)者完成prepare 4. 協(xié)調(diào)者等待所有參與者完成prepare,也就是n個(gè)參與者創(chuàng)建了memberName-1 to memberName-n這n個(gè)子節(jié)點(diǎn)。 5. 協(xié)調(diào)者創(chuàng)建/reached/{procedureName},通知參與者進(jìn)入commit ...
3.2.2 HRegionServer端實(shí)現(xiàn)
1.ZKProcedureMemberRpcs
3.1 # 2節(jié)中提到Subprocedure使用rpcs來(lái)響應(yīng)協(xié)調(diào)者Procedure。rpcs對(duì)應(yīng)接口ProcedureMemberRpcs, ZKProcedureMemberRpcs實(shí)現(xiàn)了這個(gè)接口,該接口有如下主要方法:
1. void start(final String memberName, final ProcedureMember member);
- 完成一些初始化或者線(xiàn)程池啟動(dòng)任務(wù),一般是運(yùn)行在HRegionServer上的XXXProcedureManager會(huì)在其start方法中完成這個(gè)方法的調(diào)用。
2. void sendMemberAborted(Subprocedure sub, ForeignException cause) throws IOException;
- 通知協(xié)調(diào)者放棄
3. void sendMemberAcquired(Subprocedure sub) throws IOException;
- 通知協(xié)調(diào)者,完成prepare
4. void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException;
- 通知協(xié)調(diào)者,完成commit
上后三個(gè)方法也就是Subprocedure使用來(lái)反饋給協(xié)調(diào)者狀態(tài)的方法。
下面是ZKProcedureMemberRpcs的核心成員:
1. protected ProcedureMember member;
用于創(chuàng)建,運(yùn)行subprocedure
看看ZKProcedureMemberRpcs是怎么實(shí)現(xiàn)這些接口的,下面是ZKProcedureMemberRpcs構(gòu)造函數(shù):
public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
throws KeeperException {
// 監(jiān)控zookeeper node變化,上文協(xié)調(diào)者端‘ ZKProcedureCoordinatorRpcs’也使用了, procType也就是getProcedureSignature返回值
this.zkController = new ZKProcedureUtil(watcher, procType) {
// 新node創(chuàng)建是回調(diào)
@Override
public void nodeCreated(String path) {
// 如果不是 /hbase/{procedureSignature}路徑,表明不是procedure相關(guān)的事件
if (!isInProcedurePath(path)) {
return;
}
// 如果是新建acquiredNode: /hbase/{procedureSignature}/required
// 按照上文講述ZKProcedureCoordinatorRpcs會(huì)在start方法中創(chuàng)建,
// 也就是在運(yùn)行在HMaster上的XXXProcedureManager初始化過(guò)程中創(chuàng)建。
if (isAcquiredNode(path)) {
// 這個(gè)方法查看acquiredNode的子節(jié)點(diǎn),由于子節(jié)點(diǎn)名字是procedureName
//表示一個(gè)新的procedure, 這個(gè)方法里獲取子節(jié)點(diǎn),使用ProcedureMember啟動(dòng)創(chuàng)建啟動(dòng)Subprocedure.
waitForNewProcedures();
return;
} else if (isAbortNode(path)) {
// 監(jiān)控/hbase/{procedureSignature}/abort節(jié)點(diǎn)
watchForAbortedProcedures();
return;
}
String parent = ZKUtil.getParent(path);
// 如果父節(jié)點(diǎn)是/hbase/{procedureSignature}/reached
// 表示協(xié)調(diào)者通知procedureName這個(gè)subprocedure進(jìn)入commit階段
// 由于ProcedureMember管理所有subprocedure,委托ProcedureMember去通知相應(yīng)subprocedure去調(diào)用insideBarrier完成commit
if (isReachedNode(parent)) {
receivedReachedGlobalBarrier(path);
return;
} else if (isAbortNode(parent)) {
abort(path);
return;
} else if (isAcquiredNode(parent)) {
// 如果父節(jié)點(diǎn)是/hbase/{procedureSignature}/required
// 表示協(xié)調(diào)者通知procedureName這個(gè)subprocedure進(jìn)入prepare階段
// 由于ProcedureMember管理所有subprocedure,委托ProcedureMember去通知相應(yīng)subprocedure去調(diào)用acquireBarrier完成prepare
startNewSubprocedure(path);
} else {
LOG.debug("Ignoring created notification for node:" + path);
}
}
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(this.acquiredZnode)) {
LOG.info("Received procedure start children changed event: " + path);
waitForNewProcedures();
} else if (path.equals(this.abortZnode)) {
LOG.info("Received procedure abort children changed event: " + path);
watchForAbortedProcedures();
}
}
};
}
- sendMemberAcquired
前文提到Subprocedure調(diào)用acquireBarrier完成prepare階段后,會(huì)使用這個(gè)方法通知協(xié)調(diào)者自己完成prepare。
同時(shí)在「3.2.1」最后提到協(xié)調(diào)者端ZKProcedureCoordinatorRpcs會(huì)在prepare階段一直監(jiān)聽(tīng)/hbase/{procedureSignature}/acquire,直到該node下n個(gè)參與者都創(chuàng)建了子節(jié)點(diǎn)memberName-i(0 <= i < n)。
顯然這個(gè)方法里就是在 /hbase/{procedureSignature}/acquire下以當(dāng)前member name創(chuàng)建新的節(jié)點(diǎn)。
- sendMemberCompleted
和上面一樣,Subprocedure調(diào)用insideBarrier完成commit之后,調(diào)用這個(gè)方法在/hbase/{procedureSignature}/reached/下創(chuàng)建以當(dāng)前member name命名的新節(jié)點(diǎn)通知協(xié)調(diào)者。
以上,差不多就是HBase procedure實(shí)現(xiàn)兩階段協(xié)議的過(guò)程。
總結(jié)一下用戶(hù)如果自定義Procedure需要實(shí)現(xiàn)一下部分:
- master端
- 實(shí)現(xiàn)MasterProcedureManager抽象:
- initialize 初始化工作
- getProcedureSignature 返回一個(gè)唯一的名稱(chēng)
- execProcedure, 中創(chuàng)建Procedure,創(chuàng)建ProcedureCoordinator,使用ProcedureCoordinator提交運(yùn)行Procedure。ProcedureCoordinator又委托ZkProcedureCoordinatorRpcs來(lái)協(xié)調(diào)兩個(gè)階段。
- 實(shí)現(xiàn)MasterProcedureManager抽象:
- hregionserver端:
- 實(shí)現(xiàn)RegionServerProcedureManager抽象類(lèi),
- 實(shí)現(xiàn)initialize 初始化,比如創(chuàng)建ProcedureMember,需要提供給ProcedureMember一個(gè)工廠(chǎng)類(lèi)創(chuàng)建Subprocedure。 創(chuàng)建ZKProcedureMemberRpcs實(shí)例,這個(gè)實(shí)例委托ProcedureMember創(chuàng)建、管理subprocedure。
- getProcedureSignature
返回值應(yīng)該和對(duì)應(yīng)的Master上的XXXProcedureManager一樣 - start方法
啟動(dòng)ZKProcedureMemberRpcs實(shí)例,它會(huì)開(kāi)始監(jiān)聽(tīng)相關(guān)節(jié)點(diǎn)。
- 實(shí)現(xiàn)Subprocedure抽象類(lèi)
- 實(shí)現(xiàn)acquireBarrier方法,完成prepare階段,比如提前鎖住一些需要提交的資源
- 實(shí)現(xiàn)insideBarrier方法,完成commit階段。
- 實(shí)現(xiàn)RegionServerProcedureManager抽象類(lèi),
以上可以看出master和regionsever上分別使用ZkProcedureCoordinatorRpcs和ZKProcedureMemberRpcs來(lái)實(shí)現(xiàn)基于zk的2pc,用戶(hù)也可以實(shí)現(xiàn)接口ProcedureCoordinatorRpcs使用其他途徑完成2PC的過(guò)程。