HBase學(xué)習(xí) - Procedure

本文基于hbase-1.3.0源碼

1. 前言

寫(xiě)在前面,hbase有兩個(gè)地方有Procedure:

  1. 一個(gè)Procedure的類(lèi)的路徑是org.apache.hadoop.hbase.procedure2.Procedure。 這是一個(gè)抽象類(lèi),它的一系列實(shí)現(xiàn)類(lèi)。諸如DeleteTableProcedure,DeleteColumnFamilyProcedure等等,HMaster將對(duì)table的元數(shù)據(jù)修改作為一個(gè)Procedure來(lái)實(shí)現(xiàn),一個(gè)Procedure又涉及到若干個(gè)可以嵌套的Subprocedure,一個(gè)procedure的成功一定是在所有subprocedure成功后才算成功,某個(gè)subprocedure失敗則需要回滾整個(gè)過(guò)程,是不是有點(diǎn)事務(wù)的特征。
  2. 一個(gè)Procedure的類(lèi)的路徑是org.apache.hadoop.hbase.procedure.Procedure,這個(gè)Procedure是一個(gè)分布式的實(shí)現(xiàn),一個(gè)Procedure同樣有一些Subprocedure,而這些Subprocedure的執(zhí)行則分布在多個(gè)HRegionServer上,和1中一樣只有所有subprocedure成功執(zhí)行后一個(gè)procedure才能成功,在其中某一個(gè)subprocedure失敗后,需要有一種機(jī)制去通知其他成功執(zhí)行的subprocedure,這些成功執(zhí)行subprocedure根據(jù)需要可能需要執(zhí)行回退操作。

本文講的是2中procedure。

2. Procedure設(shè)計(jì)原理

不清楚該怎么翻譯procedure比較好,我的理解是它有點(diǎn)像存儲(chǔ)過(guò)程。實(shí)際上用戶(hù)也可以像使用存儲(chǔ)過(guò)程一樣自定義Procedure讓Hbase去加載,然后在Hbase啟動(dòng)后通過(guò)rpc調(diào)用Procedure,并且可以獲得Procedure執(zhí)行的結(jié)果。

「前言」2 中提到Procedure的執(zhí)行過(guò)程中,會(huì)產(chǎn)生多個(gè)subprocedure分布到不同的HRegionServer上執(zhí)行,是完全的分布式執(zhí)行。分布式執(zhí)行也必然會(huì)面對(duì)諸如網(wǎng)絡(luò)分區(qū),部分執(zhí)行成功,以及全局狀態(tài)協(xié)調(diào)等等處理不好會(huì)導(dǎo)致一致性的問(wèn)題。

Procedure的實(shí)現(xiàn)有點(diǎn)類(lèi)似兩階段提交(2 Phase Commit,下面簡(jiǎn)稱(chēng)2PC), 2PC是實(shí)現(xiàn)分布式事務(wù)的一種經(jīng)典方法,關(guān)于2PC可以參考一下2PC, 這里簡(jiǎn)單介紹:

2PC中存在兩種角色:協(xié)調(diào)者和參與者(對(duì)應(yīng)到本文Procedure和Subprocedure)。2PC分為兩個(gè)階段:
1. 第一階段為prepare階段,協(xié)調(diào)者發(fā)起提議(proposal),詢(xún)問(wèn)所有參與者是否接收提議,
   參與者接收到提議后根據(jù)自身情況決定恢復(fù)協(xié)調(diào)者肯定或者否定的答復(fù)。
2. 第二階段,協(xié)調(diào)者等待所有參與者的回復(fù),如果收到所有參與者肯定的答復(fù),就會(huì)通知所有參與者提交之前發(fā)起的提議。
   否者不論是收到否定的答復(fù)或者等待超時(shí),都同志所有參與者終止提議。

2PC在協(xié)調(diào)者不出現(xiàn)問(wèn)題的情況下可以保證最終一致性。但是協(xié)調(diào)者在prepare階段發(fā)起提議后奔潰,沒(méi)有新的協(xié)調(diào)者替換導(dǎo)致所有參與者阻塞。
又或者在第二階段協(xié)調(diào)者通知第一個(gè)參與者提交后協(xié)調(diào)者和收到提交請(qǐng)求的參與者都奔潰,新的協(xié)調(diào)者起來(lái)后無(wú)從判斷上次提議狀態(tài),從而都會(huì)出現(xiàn)不一致。

HBase里Procedure通過(guò)zookeeper解決了這些問(wèn)題。

3. 設(shè)計(jì)細(xì)節(jié)

3.1 基本概念

「2」中提到procedure是分布式執(zhí)行的,并且實(shí)現(xiàn)了2PC,有協(xié)調(diào)者和參與者兩種角色,兩種角色分別運(yùn)行在HMaster和HRegionServer上,下文會(huì)分別講HMaster和HRegionServer上不同角色的實(shí)現(xiàn),這其中又會(huì)涉及到以下的一些類(lèi):

1. HMaster上運(yùn)行

  • MasterProcedureManagerHost
    名字是XXXHost的一般都是提供運(yùn)行環(huán)境,MasterProcedureManagerHost在HMaster初始化過(guò)程中創(chuàng)建:

    1. 調(diào)用loadProcedures然后它會(huì)默認(rèn)加載SnapshotManagerMasterFlushTableProcedureManager這兩個(gè)procedure管理類(lèi)(這兩個(gè)管理類(lèi)一個(gè)負(fù)責(zé)管理snapshot procedure,一個(gè)管理flush table的procedure)。
      除了默認(rèn)加載以外,還會(huì)加載xml配置項(xiàng)hbase.procedure.master.classes中用戶(hù)自定義的XXXManager。
    2. 加載好這些manager類(lèi)之后,就是調(diào)用它們的initialize方法完成初始化了。
  • MasterProcedureManager
    這是一個(gè)抽象類(lèi),繼承自抽象類(lèi)ProcedureManager。它的實(shí)現(xiàn)類(lèi)比如上面提到的SnapshotManager,運(yùn)行在HMaster上,由MasterProcedureManagerHost加載,這些manager負(fù)責(zé)啟動(dòng)并管理同一種類(lèi)的Procedure,因?yàn)橐粋€(gè)Procedure可以被客戶(hù)端調(diào)用多次。
    用戶(hù)如果實(shí)現(xiàn)自己的XXXManager,那么提供給HMaster運(yùn)行的就需要實(shí)現(xiàn)MasterProcedureManager,它有如下的抽象方法需要實(shí)現(xiàn):

    1. public abstract String getProcedureSignature();  
       它應(yīng)當(dāng)返回一個(gè)唯一值,客戶(hù)端調(diào)用Procedure是通過(guò)對(duì)應(yīng)的Manager完成的,這就需要通過(guò)這個(gè)signature識(shí)別manager。
    2. public abstract void initialize(MasterServices master, MetricsMaster metricsMaster)
      MasterProcedureManagerHost加載好這個(gè)類(lèi)并實(shí)例化類(lèi)對(duì)象后會(huì)調(diào)用這個(gè)方法完成初始化工作
    3. public void execProcedure(ProcedureDescription desc) ;
       客戶(hù)端通過(guò)rpc有兩種rpc方法去調(diào)用一個(gè)procedure,這是其中一種,沒(méi)有返回值,參數(shù)類(lèi)型ProcedureDescription定在protobuf文件中,下面是它的定義:
         message ProcedureDescription {
                 required string signature = 1;  xxxManager的getProcedureSignature返回值
                 optional string instance = 2;   相當(dāng)于給每次調(diào)用的procedure取一個(gè)名字
                 optional int64 creation_time = 3 [default = 0];
                repeated NameStringPair configuration = 4;  name:value形式的參數(shù)
          }
       HBase只是替我們實(shí)現(xiàn)了2PC框架,提供一些幫助類(lèi),至于調(diào)用的procedure具體做什么用戶(hù)應(yīng)該自己在exeProcedure中實(shí)現(xiàn)。
    
  • Procedure(org.apache.hadoop.hbase.procedure.Procedure
    上面提到用戶(hù)自定義的procedure,在HMaster端需要繼承MasterProcedureManager,然后在execProcedure中去實(shí)現(xiàn)核心邏輯,一個(gè)完整的procedure還需要分布在hregionServer上的Subprocedure們配合。HBase提供了2pc實(shí)現(xiàn)然用戶(hù)能夠使用它以使得一個(gè)procedure中各個(gè)subprocedure配合完成,類(lèi)Procedure,我的理解就是HBase提供2PC實(shí)現(xiàn)的核心,下面是一段這個(gè)類(lèi)介紹注釋?zhuān)?/p>

    This class encapsulates state and methods for tracking and managing a distributed procedure, as well as aborting if any member encounters a problem or if a cancellation is requested

    Procedure有2個(gè)重要方法:

    1. sendGlobalBarrierStart
      相當(dāng)于2PC prepare階段,調(diào)用這個(gè)方法等待所有參與者(Subprocedure)完成prepare。
    2. sendGlobalBarrierReached
      相當(dāng)于2PC的commit階段,調(diào)用這個(gè)方法等待所有參與者(Subprocedure)commit完成。
      關(guān)于Procedure的具體實(shí)現(xiàn)原理下文會(huì)介紹。

2. HRegionServer上運(yùn)行
HRegionServer上基本都會(huì)有一個(gè)與HMaster相對(duì)應(yīng)的類(lèi)實(shí)現(xiàn),一個(gè)procedure的全部過(guò)程是需要RegionServer和HMaster配合完成的。

  • RegionServerProcedureManagerHost
    對(duì)應(yīng)MasterProcedureManagerHost,它在HRegionServer啟動(dòng)過(guò)程中完成創(chuàng)建和初始化:

    1. 調(diào)用loadProcedures,加載用戶(hù)配置項(xiàng)hbase.procedure.regionserver.classes定義的procedureManager,運(yùn)行在HRegionServer上的procedureManager必須要繼承RegionServerProcedureManager, 這些XXXManager和HMaster上的XXXManager對(duì)應(yīng),相當(dāng)于管理procedure中的subpProcedure(參與者)。
      處理用戶(hù)配置的以外,還會(huì)默認(rèn)加載RegionServerSnapshotManagerRegionServerFlushTableProcedureManager,對(duì)應(yīng)HMaster上默認(rèn)加載的SnapshotManagerMasterFlushTableProcedureManager.
    2. 調(diào)用initialize,方法中調(diào)用每一個(gè)加載的XXXManager的initialize方法完成初始化。
    3. 調(diào)用start, start方法中調(diào)用每一個(gè)XXXManager的start方法。
  • RegionServerProcedureManager
    對(duì)應(yīng)MasterProcedureManager,同樣是一個(gè)抽象類(lèi),同樣繼承自ProcedureManager??蛻?hù)端調(diào)用procedure時(shí),總是會(huì)先走到MasterProcedureManager的具體實(shí)現(xiàn)類(lèi)上執(zhí)行execProcedure,在execProcedure的實(shí)現(xiàn)邏輯里使用Procedure這個(gè)類(lèi)提供的方法完成協(xié)調(diào)者的工作。接著就是參與者RegionServerProcedureManager的工作:接受協(xié)調(diào)者發(fā)送的兩個(gè)階段的請(qǐng)求,處理請(qǐng)求,反饋給協(xié)調(diào)者。RegionServerProcedureManager有兩個(gè)主要的抽象方法需要實(shí)現(xiàn):

     public abstract void initialize(RegionServerServices rss) throws KeeperException;
    
     public abstract void start();
     這兩個(gè)方法正是RegionServerProcedureManagerHost的initialize和start時(shí)會(huì)調(diào)用的。
     由于在MasterProcedureManager的execProcedure中可以使用Procedure這個(gè)類(lèi)提供的方法完成兩個(gè)階段的協(xié)調(diào)。那么在start方法中,我們應(yīng)該可以去接受協(xié)調(diào)者的兩個(gè)階段請(qǐng)求,然后做出處理。
    
  • Subprocedure
    這是一個(gè)抽象類(lèi),同時(shí)它繼承Callable接口,參與者的重要邏輯應(yīng)該繼承Subprocedure,并實(shí)現(xiàn)它的三個(gè)主要方法:

     1. abstract public void acquireBarrier() throws ForeignException;
      prepare階段參與者接收到協(xié)調(diào)者prepare請(qǐng)求后需要在這個(gè)方法里完成主要邏輯。拋出異常意味著否定的答復(fù)。
     2. abstract public byte[] insideBarrier() throws ForeignException;
       第二階段,意味著所有參與者(也就是運(yùn)行在其他RegionServer上的Subprocdure完成了prepare階段acquireBarrier調(diào)用),進(jìn)入提交階段。
       這是一個(gè)有返回值的方法,它的返回值會(huì)傳遞給協(xié)調(diào)者,也就是Procedure。
     3. abstract public void cleanup(Exception e);
       以上兩個(gè)階段出現(xiàn)異常,都會(huì)調(diào)用這個(gè)方法完成本地的一些清理操作:資源釋放,已完成操作的回滾等等。
    
    以上方法都在call()方法中完成,作為用戶(hù)來(lái)說(shuō)不需要關(guān)心,如何接受協(xié)調(diào)者 的請(qǐng)求,以及回復(fù)協(xié)調(diào)者。call方法中完成了這些調(diào)用。
    call 方法有以下調(diào)用:
     -> acquireBarrier
     -> rpcs.sendMemberAcquired(this); 參與者答復(fù)協(xié)調(diào)者prepare肯定請(qǐng)求。
     -> waitForReachedGlobalBarrier();等待所有參與者完成prepare階段
     -> insideBarrier
     -> rpcs.sendMemberCompleted();通知協(xié)調(diào)者,當(dāng)前參與者完成第二階段  commit
    以上過(guò)程出現(xiàn)異常,都會(huì)調(diào)用cancel通知協(xié)調(diào)者取消本次procedure,再調(diào)用cleanup完成本地清理工作。
    
  • ProcedureMember
    ProcedureMemeber負(fù)責(zé)創(chuàng)建、運(yùn)行、管理subprocedure。

3.2 細(xì)節(jié) - 基于zookeeper的實(shí)現(xiàn)

盡管「3.1」中提到了master端(協(xié)調(diào)者),使用Procedure這個(gè)類(lèi)來(lái)協(xié)調(diào)兩個(gè)階段。regionserver端(參與者)在Subprocedure完成具體邏輯, 使用ProcedureMember來(lái)管理Subprocedure,但沒(méi)有涉及2PC的實(shí)現(xiàn)。
HBase目前提供了基于Zookeeper實(shí)現(xiàn)的2PC, 當(dāng)然由于運(yùn)行在HMaster和HRegionServer上的XXXProcedureMananger都完全是有用戶(hù)自己實(shí)現(xiàn),用戶(hù)也可以基于其他途徑實(shí)現(xiàn)2PC, 然后替換掉Procedure類(lèi)中的zk的實(shí)現(xiàn)。
下面主要講一下基于zookeeper的實(shí)現(xiàn),同樣分為HMaster和HRegionServer量方面。

3.2.1 HMaster端實(shí)現(xiàn)

1. Procedure
由于前文提到XXXProcedureManager主要使用Procedure類(lèi)去實(shí)現(xiàn)兩個(gè)階段的協(xié)調(diào),先看看Procedure類(lèi)。下面是Procedure的核心成員:

  1. final private String procName;
     每創(chuàng)建一個(gè)新的procedure都應(yīng)該有一個(gè)不同的名稱(chēng),客戶(hù)端rpc調(diào)用procedure時(shí)可以指定,參考前面說(shuō)的protobuf定義文件ProcedureDescription的instance成員。
  2.final private byte[] args;
     args參數(shù)是在prepare請(qǐng)求階段發(fā)送給參與者的,所有的參與者都會(huì)收到參數(shù)。
  3. final CountDownLatch acquiredBarrierLatch;
     final CountDownLatch releasedBarrierLatch;
     final CountDownLatch completedLatch;
     這三個(gè)計(jì)數(shù)同步器,分別用來(lái)等待所有參與者完成prepare,完成commit,等待procedure結(jié)束。
  4. private final List<String> acquiringMembers;
     private final List<String> inBarrierMembers;
     private final HashMap<String, byte[]> dataFromFinishedMembers;
        acquiringMembers中保存進(jìn)入prepare階段的參與者;
        inBarrierMembers完成prepare需要進(jìn)入commit階段的參與者;
        dataFromFinishedMembers保存完成commit后的參與者返回的數(shù)據(jù),參考前文Subprocedure # insideBarrier方法返回值。
  5. private ProcedureCoordinator coord;
      看類(lèi)的名字也知道Procedure使用它來(lái)完成真正的兩個(gè)階段的協(xié)調(diào)工作.
      它還負(fù)責(zé)Procedure的創(chuàng)建以及運(yùn)行。下文重點(diǎn)講一下ProcedureCoordinator。

2. ProcedureCoordinator
ProcedureCoordinator負(fù)責(zé)創(chuàng)建Procedure,并且提交Procedure到線(xiàn)程池運(yùn)行,反過(guò)來(lái)Procedure又調(diào)用ProcedureCoordinator去完成所有subprocedure的兩個(gè)階段的協(xié)調(diào)。
上文提到Procedure的兩個(gè)主要方法:

1. sendGlobalBarrierStart  -- 請(qǐng)求所有參與者prepare
2. sendGlobalBarrierReached -- 請(qǐng)求所有參與者commit

這兩個(gè)方法里面分別調(diào)用了:
1. coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
- 通知所有參與者也就是this.acquiringMembers 進(jìn)入prepare階段
2. coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
- 所有參與者完成prepare,通知它們進(jìn)入commit階段。

coord.getRpcs返回ProcedureCoordinatorRpcs,這是一個(gè)接口,顯然它是委派它來(lái)通知所有參與者進(jìn)入prepare,再進(jìn)入commit階段。

HBase提供了ProcedureCoordinatorRpcs一種基于Zookeeper的實(shí)現(xiàn):ZKProcedureCoordinatorRpcs 。
而且現(xiàn)在運(yùn)行在HMaster上的各種XXXProcedureManager都使用這種基于zookeeper的實(shí)現(xiàn)。

3. ZKProcedureCoordinatorRpcs
它實(shí)現(xiàn)了接口ProcedureCoordinatorRpcs,有如下核心方法:

1. boolean start(final ProcedureCoordinator listener);
  - ProcedureCoordinator在構(gòu)造函數(shù)中調(diào)用start
1. sendGlobalBarrierAcquire(Procedure procName, byte[] info, List<String> members) 
  - 通知進(jìn)入prepare階段, info是傳遞給各個(gè)參與者的信息,members即參與者名稱(chēng)。
2. sendGlobalBarrierReached(Procedure procName, List<String> members)
  - 通知進(jìn)入commit階段
3. sendAbortToMembers(Procedure procName, ForeignException cause) 
  - 參與者失敗時(shí)調(diào)用,通知自身無(wú)法完成本次procedure。
4. resetMembers(Procedure procName) 
  - 完成procedure后調(diào)用,通知參與者重置自身的狀態(tài)

下面說(shuō)說(shuō)ZKProcedureCoordinatorRpcs怎么基于zk實(shí)現(xiàn)了這些方法:
它有如下成員:

ZooKeeperWatcher watcher; -- zookeeper訪(fǎng)問(wèn)客戶(hù)端
String procedureType; -- HMaster上的XXXProcedureManager創(chuàng)建ZKProcedureCoordinatorRpcs一般使用自身getProcedureSignature返回值初始化這個(gè)域,這個(gè)字段需要保持唯一
String coordName;
-- 不是很重要,一般就是當(dāng)前運(yùn)行的server名稱(chēng)。
ProcedureCoordinator coordinator; 
-- ProcedureCoordinator實(shí)例,也就是它們ProcedureCoordinator和ZKProcedureCoordinatorRpcs互相持有對(duì)方。
  1. start方法
final public boolean start(final ProcedureCoordinator coordinator) {
    if (this.coordinator != null) {
      throw new IllegalStateException(
        "ZKProcedureCoordinator already started and already has listener installed");
    }
    this.coordinator = coordinator;

    try {
      /**
        假設(shè)procedureType是pt, ZKProcedureUtil構(gòu)造函數(shù)會(huì)在zk上創(chuàng)建下面三個(gè)node:
          1. acquiredZnode : /hbase/pt/acquired
          2. reachedZnode: /hbase/pt/reached
          3. abortZnode: /hbase/pt/abort
       所以基于zk的實(shí)現(xiàn)的2PC是通過(guò)node的改變來(lái)通知所有參與者當(dāng)前處在哪一個(gè)階段。
      */
      this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
        @Override
        // nodeCreated方法會(huì)在zk上新的node創(chuàng)建時(shí)回調(diào)
        public void nodeCreated(String path) {
          //判斷一下node的路徑是不是/hbase/pt,不是的話(huà)表示node改變和當(dāng)前procedure沒(méi)關(guān)系
          if (!isInProcedurePath(path)) return;
          LOG.debug("Node created: " + path);
          logZKTree(this.baseZNode);
         // 判斷新建的node是不是符合路徑/hbase/pt/acquired/{procedureName}/{memberName}. 
         // 關(guān)于procedureName,每一個(gè)procedure調(diào)用都會(huì)有一個(gè)新的名稱(chēng).  memberName即參與者名稱(chēng)。
         // 這個(gè)路徑的創(chuàng)建表明有一個(gè)參與者完成prepare階段。后面講到參與者部分時(shí)會(huì)提到參與者完成prepare后會(huì)創(chuàng)建這個(gè)路徑。
          if (isAcquiredPathNode(path)) {
              // 通知一下一個(gè)參與者完成prepare,畢竟Procedure 還阻塞在acquiredBarrierLatch上等待參與者都完成prepare階段。
            coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
              ZKUtil.getNodeName(path));
         // 和上面的if類(lèi)似,判斷新建的node符不符合/hbase/pt/reached/{procedureName}/{memberName}. 符合表明一個(gè)參與者完成commit階段。
          } else if (isReachedPathNode(path)) {
            String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
            String member = ZKUtil.getNodeName(path);
            // get the data from the procedure member
            try {
              /**
                前面說(shuō)到Subprocedure # insideBarrier會(huì)有返回值,這個(gè)返回值被設(shè)置成node的data,此處解析出返回值。
              */
              byte[] dataFromMember = ZKUtil.getData(watcher, path);
              // ProtobufUtil.isPBMagicPrefix will check null
              if (dataFromMember != null && dataFromMember.length > 0) {
                if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
                  ForeignException ee = new ForeignException(coordName,
                    "Failed to get data from finished node or data is illegally formatted:"
                        + path);
                  coordinator.abortProcedure(procName, ee);
                } else {
                  dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
                    dataFromMember.length);
                  LOG.debug("Finished data from procedure '" + procName
                    + "' member '" + member + "': " + new String(dataFromMember));
                 /**
                    通知一個(gè)參與者完成commit,此時(shí)Procedure阻塞在releasedBarrierLatch等待所有參與者完成commit。
                 */
                  coordinator.memberFinishedBarrier(procName, member, dataFromMember);
                }
              } else {
                coordinator.memberFinishedBarrier(procName, member, dataFromMember);
              }
            } catch (KeeperException e) {
              ForeignException ee = new ForeignException(coordName, e);
              coordinator.abortProcedure(procName, ee);
            } catch (InterruptedException e) {
              ForeignException ee = new ForeignException(coordName, e);
              coordinator.abortProcedure(procName, ee);
            }
          } else if (isAbortPathNode(path)) {
            abort(path);
          } else {
            LOG.debug("Ignoring created notification for node:" + path);
          }
        }
      };
      zkProc.clearChildZNodes();
    } catch (KeeperException e) {
      ...
    }
   ...
  }
  1. sendGlobalBarrierAcquire 通知參與者prepare
// 1. info 是傳遞給進(jìn)入prepare的subprocedure的
// 2. nodeNames 即所有參與者,
final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
      throws IOException, IllegalArgumentException {
    String procName = proc.getName();

    String abortNode = zkProc.getAbortZNode(procName);
    try {
      // 檢查/hbase/pt/abort/{procedureName}是否存在,存在則表明放棄procedureName這個(gè)procedure。
      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
        abort(abortNode);
      }
    
    } catch (KeeperException e) {
      ...
    }

    // node: /hbase/pt/acquired/{procedureName}
    String acquire = zkProc.getAcquiredBarrierNode(procName);
    
    try {
      byte[] data = ProtobufUtil.prependPBMagic(info);
      // 在zk上創(chuàng)建/hbase/pt/acquired/{procedureName}節(jié)點(diǎn)
      // 后面在說(shuō)subprocedure會(huì)提到,subprocedure會(huì)監(jiān)控這個(gè)階段判斷是不是一個(gè)新的procedure啟動(dòng)了,一旦監(jiān)控到這個(gè)節(jié)點(diǎn)參與者就進(jìn)入prepare階段
      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
      
      for (String node : nodeNames) {
        // znode : /hbase/pt/acquired/{procedureName}/{memberName}
        String znode = ZKUtil.joinZNode(acquire, node);
        
        // 檢查/hbase/pt/acquired/{procedureName}/{memberName}是否存在,一旦存在表明memberName這個(gè)參與者完成prepare。
        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
         //通知一個(gè)member完成prepare
          coordinator.memberAcquiredBarrier(procName, node);
        }
      }
    } catch (KeeperException e) {
      ...
    }
  }
  1. sendGlobalBarrierReached
    這里不再貼代碼,它和sendGlobalBarrierAcquire工作過(guò)程差不多:

    • 創(chuàng)建/hbase/pt/reached/{procedureName}這個(gè)node,所有監(jiān)控這個(gè)node的參與者因此進(jìn)入commit階段。
    • 對(duì)所有的member, 檢查/hbase/pt/reached/{procedureName}/{memberName}是否存在,存在表明memberName這個(gè)參與者完成commit。
  2. 總結(jié)
    基于zk的實(shí)現(xiàn), zk上會(huì)存在如下node:

    /hbase/{procedureSignature}
                          | - /required
                                   |- /{procedureName}
                                              | - /{memberName-1}
                                                 ...
                                              | - /{memberName-n}
                          | - /reached
                                   |- /{procedureName}
                                              | - /{memberName-1}
                                                 ...
                                              | - /{memberName-n}
                          | -/abort
                                 | - /{procedureName}
                                      
    1. 協(xié)調(diào)者創(chuàng)建/required/{procedureName}
    2. 參與者監(jiān)聽(tīng)到/required/{procedureName}的存在后,進(jìn)入prepare階段
    3. 參與者memberName-i調(diào)用Subprocedure # acquireBarrier
    4. 參與者memberName-i完成prepare階段,創(chuàng)建/required/{procedureName}/{memberName-i}這個(gè)node通知協(xié)調(diào)者完成prepare
    4. 協(xié)調(diào)者等待所有參與者完成prepare,也就是n個(gè)參與者創(chuàng)建了memberName-1 to memberName-n這n個(gè)子節(jié)點(diǎn)。
    5. 協(xié)調(diào)者創(chuàng)建/reached/{procedureName},通知參與者進(jìn)入commit
    ...
    

3.2.2 HRegionServer端實(shí)現(xiàn)

1.ZKProcedureMemberRpcs
3.1 # 2節(jié)中提到Subprocedure使用rpcs來(lái)響應(yīng)協(xié)調(diào)者Procedure。rpcs對(duì)應(yīng)接口ProcedureMemberRpcs, ZKProcedureMemberRpcs實(shí)現(xiàn)了這個(gè)接口,該接口有如下主要方法:

1. void start(final String memberName, final ProcedureMember member);
    - 完成一些初始化或者線(xiàn)程池啟動(dòng)任務(wù),一般是運(yùn)行在HRegionServer上的XXXProcedureManager會(huì)在其start方法中完成這個(gè)方法的調(diào)用。
2. void sendMemberAborted(Subprocedure sub, ForeignException cause) throws IOException;
    - 通知協(xié)調(diào)者放棄
3. void sendMemberAcquired(Subprocedure sub) throws IOException;
    - 通知協(xié)調(diào)者,完成prepare
4. void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException;
    - 通知協(xié)調(diào)者,完成commit
上后三個(gè)方法也就是Subprocedure使用來(lái)反饋給協(xié)調(diào)者狀態(tài)的方法。

下面是ZKProcedureMemberRpcs的核心成員:

1. protected ProcedureMember member;
   用于創(chuàng)建,運(yùn)行subprocedure

看看ZKProcedureMemberRpcs是怎么實(shí)現(xiàn)這些接口的,下面是ZKProcedureMemberRpcs構(gòu)造函數(shù):

public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
      throws KeeperException {
    // 監(jiān)控zookeeper node變化,上文協(xié)調(diào)者端‘ ZKProcedureCoordinatorRpcs’也使用了, procType也就是getProcedureSignature返回值
    this.zkController = new ZKProcedureUtil(watcher, procType) {
      // 新node創(chuàng)建是回調(diào)
      @Override
      public void nodeCreated(String path) {
        // 如果不是 /hbase/{procedureSignature}路徑,表明不是procedure相關(guān)的事件
        if (!isInProcedurePath(path)) {
          return;
        }
        // 如果是新建acquiredNode: /hbase/{procedureSignature}/required
        // 按照上文講述ZKProcedureCoordinatorRpcs會(huì)在start方法中創(chuàng)建,
        // 也就是在運(yùn)行在HMaster上的XXXProcedureManager初始化過(guò)程中創(chuàng)建。
        if (isAcquiredNode(path)) {
          // 這個(gè)方法查看acquiredNode的子節(jié)點(diǎn),由于子節(jié)點(diǎn)名字是procedureName
          //表示一個(gè)新的procedure, 這個(gè)方法里獲取子節(jié)點(diǎn),使用ProcedureMember啟動(dòng)創(chuàng)建啟動(dòng)Subprocedure.
          waitForNewProcedures();
          return;
        } else if (isAbortNode(path)) {
          // 監(jiān)控/hbase/{procedureSignature}/abort節(jié)點(diǎn)
          watchForAbortedProcedures();
          return;
        }
        String parent = ZKUtil.getParent(path);
        // 如果父節(jié)點(diǎn)是/hbase/{procedureSignature}/reached
        // 表示協(xié)調(diào)者通知procedureName這個(gè)subprocedure進(jìn)入commit階段
        // 由于ProcedureMember管理所有subprocedure,委托ProcedureMember去通知相應(yīng)subprocedure去調(diào)用insideBarrier完成commit
        if (isReachedNode(parent)) {
          receivedReachedGlobalBarrier(path);
          return;
        } else if (isAbortNode(parent)) {
          abort(path);
          return;
        } else if (isAcquiredNode(parent)) {
        // 如果父節(jié)點(diǎn)是/hbase/{procedureSignature}/required
        // 表示協(xié)調(diào)者通知procedureName這個(gè)subprocedure進(jìn)入prepare階段
        // 由于ProcedureMember管理所有subprocedure,委托ProcedureMember去通知相應(yīng)subprocedure去調(diào)用acquireBarrier完成prepare
          startNewSubprocedure(path);
        } else {
          LOG.debug("Ignoring created notification for node:" + path);
        }
      }
      
      @Override
      public void nodeChildrenChanged(String path) {
        if (path.equals(this.acquiredZnode)) {
          LOG.info("Received procedure start children changed event: " + path);
          waitForNewProcedures();
        } else if (path.equals(this.abortZnode)) {
          LOG.info("Received procedure abort children changed event: " + path);
          watchForAbortedProcedures();
        }
      }
    };
  }
  1. sendMemberAcquired
    前文提到Subprocedure調(diào)用acquireBarrier完成prepare階段后,會(huì)使用這個(gè)方法通知協(xié)調(diào)者自己完成prepare。
    同時(shí)在「3.2.1」最后提到協(xié)調(diào)者端ZKProcedureCoordinatorRpcs會(huì)在prepare階段一直監(jiān)聽(tīng) /hbase/{procedureSignature}/acquire,直到該node下n個(gè)參與者都創(chuàng)建了子節(jié)點(diǎn)memberName-i(0 <= i < n)。

顯然這個(gè)方法里就是在 /hbase/{procedureSignature}/acquire下以當(dāng)前member name創(chuàng)建新的節(jié)點(diǎn)。

  1. sendMemberCompleted
    和上面一樣,Subprocedure調(diào)用insideBarrier完成commit之后,調(diào)用這個(gè)方法在/hbase/{procedureSignature}/reached/下創(chuàng)建以當(dāng)前member name命名的新節(jié)點(diǎn)通知協(xié)調(diào)者。

以上,差不多就是HBase procedure實(shí)現(xiàn)兩階段協(xié)議的過(guò)程。
總結(jié)一下用戶(hù)如果自定義Procedure需要實(shí)現(xiàn)一下部分:

  1. master端
    • 實(shí)現(xiàn)MasterProcedureManager抽象:
      • initialize 初始化工作
      • getProcedureSignature 返回一個(gè)唯一的名稱(chēng)
      • execProcedure, 中創(chuàng)建Procedure,創(chuàng)建ProcedureCoordinator,使用ProcedureCoordinator提交運(yùn)行Procedure。ProcedureCoordinator又委托ZkProcedureCoordinatorRpcs來(lái)協(xié)調(diào)兩個(gè)階段。
  2. hregionserver端:
    • 實(shí)現(xiàn)RegionServerProcedureManager抽象類(lèi),
      • 實(shí)現(xiàn)initialize 初始化,比如創(chuàng)建ProcedureMember,需要提供給ProcedureMember一個(gè)工廠(chǎng)類(lèi)創(chuàng)建Subprocedure。 創(chuàng)建ZKProcedureMemberRpcs實(shí)例,這個(gè)實(shí)例委托ProcedureMember創(chuàng)建、管理subprocedure。
      • getProcedureSignature
        返回值應(yīng)該和對(duì)應(yīng)的Master上的XXXProcedureManager一樣
      • start方法
        啟動(dòng)ZKProcedureMemberRpcs實(shí)例,它會(huì)開(kāi)始監(jiān)聽(tīng)相關(guān)節(jié)點(diǎn)。
    • 實(shí)現(xiàn)Subprocedure抽象類(lèi)
      • 實(shí)現(xiàn)acquireBarrier方法,完成prepare階段,比如提前鎖住一些需要提交的資源
      • 實(shí)現(xiàn)insideBarrier方法,完成commit階段。

以上可以看出master和regionsever上分別使用ZkProcedureCoordinatorRpcs和ZKProcedureMemberRpcs來(lái)實(shí)現(xiàn)基于zk的2pc,用戶(hù)也可以實(shí)現(xiàn)接口ProcedureCoordinatorRpcs使用其他途徑完成2PC的過(guò)程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容