Apache Flink源碼解析 (八)Flink的HighAvailabilityServices

概述

Flink內(nèi)部有一些服務(wù)是JobMaster和TaskExecutor共用的。如HighAvailabilityServices, RpcService, ActorSystem(MetricQueryService), HeartbeatServices, MetricRegistryImpl, BlobCacheService.

HighAvailabilityServices 的作用

  • 提供 Leader 獲取服務(wù)(ResourceManager, Dispatcher, JobManager, WebMonitor)
  • 提供 Leader 選舉服務(wù)(同上)
  • 提供Checkpoint恢復(fù)工廠類(獲取已完成的Checkpoint的元信息,以及Checkpoint計(jì)數(shù)器)
  • 提供SubmittedJobGraphStore,用來(lái)保存和恢復(fù)JobGraph
  • 提供大文件(Blob)的高可用存儲(chǔ)
  • 提供(RunningJobsRegistry),任務(wù)狀態(tài)信息的保存與獲取

HighAvailabilityServices 的使用者

  • ClusterEntrypoint
  • TaskManagerRunner
  • ClusterClient

HighAvailabilityServices 的創(chuàng)建

  • HighAvailabilityServices 的創(chuàng)建是通過(guò)HighAvailabilityServicesUtils這個(gè)工具類,這個(gè)工具類提供了兩個(gè)重要的靜態(tài)方法來(lái)生成HighAvailabilityServices 。

    • 第一個(gè)是 createAvailableOrEmbeddedServices(Configuration config, Executor executor),主要用于創(chuàng)建MiniCluster,服務(wù)于測(cè)試和本地運(yùn)行。

    • 第二個(gè)是 createHighAvailabilityServices(Configuration configuration, Executor executor, AddressResolution addressResolution),相比于第一個(gè)方法,它的參數(shù)還需要AddressResolution 。

      /**
       * Enum specifying whether address resolution should be tried or not when creating the
       * {@link HighAvailabilityServices}.
       */
      public enum AddressResolution {
          TRY_ADDRESS_RESOLUTION,
          NO_ADDRESS_RESOLUTION
      }
      
      • TRY_ADDRESS_RESOLUTION和NO_ADDRESS_RESOLUTION,分別代表是否需要解析地址。用于在非HA環(huán)境下的直接解析地址,如果hostname不存在則快速失敗。在ClusterEntrypoint中由于是本地,不需要解析,而在TaskManagerRunner與ClusterClient中使用了TRY_ADDRESS_RESOLUTION,因?yàn)橐粋€(gè)是負(fù)責(zé)執(zhí)行具體任務(wù),另一個(gè)則是用戶的客戶端。
      • 這個(gè)方法首先獲取高可用模式(HighAvailabilityMode),分別是無(wú)高可用,基于Zookeeper的高可用,以及自己定制的高可用模式。
        public enum HighAvailabilityMode {
            NONE(false),
            ZOOKEEPER(true),
            FACTORY_CLASS(true);
        }
        
      • 其中None模式JobManager地址是固定的,所以直接從Configuration中獲取地址并生成一個(gè)StandaloneHaServices。
      • Zookeeper模式會(huì)先創(chuàng)建BlobStorService,就是一個(gè)高可用的大文件持久化服務(wù),這個(gè)服務(wù)將文件保存在high-availability.storageDir配置的位置,并在Zookeeper上保存元信息。
        case ZOOKEEPER:
            BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
            return new ZooKeeperHaServices(
                      ZooKeeperUtils.startCuratorFramework(configuration),
                      executor,
                      configuration,
                      blobStoreService);
        
      • Custom模式需要用戶自己實(shí)現(xiàn)HighAvailabilityServicesFactory

LeaderElectionService & LeaderRetrievalService

  • LeaderElectionService和LeaderRetrievalService分別提供了某個(gè)組件參加Leader選舉和獲取其他組件Leader的功能。(組件包括ResourceManager, Dispatcher, JobManager, WebMonitor)。
  • LeaderElectionService

    • 接口如下所示, start方法就是將當(dāng)前的組件加入Leader選舉,上述四個(gè)組件都是現(xiàn)了LeaderContender接口。

    • 當(dāng)某個(gè)組件被選舉為leader時(shí),會(huì)回調(diào)該組件實(shí)現(xiàn)的grantLeadership方法(第一次被選舉為leader),當(dāng)某個(gè)組件不再是leader時(shí),會(huì)回調(diào)該組件實(shí)現(xiàn)的revokeLeadership方法。

      public interface LeaderElectionService {
        void start(LeaderContender contender) throws Exception;
        void stop() throws Exception;
        void confirmLeaderSessionID(UUID leaderSessionID);
        boolean hasLeadership(@Nonnull UUID leaderSessionId);
      }
      public interface LeaderContender {
        void grantLeadership(UUID leaderSessionID);
        void revokeLeadership();
        String getAddress();
        void handleError(Exception exception);
      }
      
  • LeaderRetrievalService

    • LeaderRetrievalService 非常簡(jiǎn)潔,提供了start和stop方法,并且start方法只能被調(diào)用一次,在ZK模式中因?yàn)樗粫?huì)監(jiān)聽(tīng)一條ZK上的路徑(即一個(gè)組件的變化)。
    • 在啟動(dòng)LeaderRetrievalService的方法中需要接收參數(shù)LeaderRetrievalListener,將實(shí)現(xiàn)這個(gè)接口的類的實(shí)例作為參數(shù)傳入這個(gè)方法,在相應(yīng)組件leader發(fā)生變化時(shí)會(huì)回調(diào)notifyLeaderAddress方法,在LeaderRetrievalService拋出異常的時(shí)候會(huì)調(diào)用handleError方法。
      public interface LeaderRetrievalService {
        void start(LeaderRetrievalListener listener) throws Exception;
        void stop() throws Exception;
      }
      public interface LeaderRetrievalListener {
        void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
        void handleError(Exception exception);
      }
      

HighAvailabilityServices 的典型實(shí)現(xiàn) ZooKeeperHaServices

  • ZooKeeperHaServices的Constructor需要接受四個(gè)參數(shù),分別為CuratorFramework, Executor, Configuration, BlobStoreService

  • 在HighAvailabilityServices創(chuàng)建中,已經(jīng)介紹了BlobStoreService的作用,此處要再介紹一下是創(chuàng)建CuratorFramework的方法ZooKeeperUtils.startCuratorFramework(configuration)

  • CuratorFramework創(chuàng)建

    • 下圖是如何通過(guò)Builder創(chuàng)建CuratorFramework,詳情可以閱讀Zookeeper客戶端Curator使用詳解
      一文,這里會(huì)介紹這些參數(shù)是如何配置的
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(zkQuorum)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts))
                // Curator prepends a '/' manually and throws an Exception if the
                // namespace starts with a '/'.
                .namespace(rootWithNamespace.startsWith("/") ? rootWithNamespace.substring(1) : rootWithNamespace)
                .aclProvider(aclProvider)
                .build();
      
      • zkQuorum對(duì)應(yīng)配置中的high-availability.zookeeper.quorum,即Zookeeper的地址
      • sessionTimeout對(duì)應(yīng)配置中的high-availability.zookeeper.client.session-timeout,單位為毫秒,默認(rèn)60000即一分鐘,ZK會(huì)話的超時(shí)時(shí)間
      • connectionTimeout對(duì)應(yīng)配置中的high-availability.zookeeper.client.connection-timeout,單位為毫秒,默認(rèn)15000即15秒,ZK的連接超時(shí)時(shí)間
      • 重試策略為ExponentialBackoffRetry,從概率上來(lái)講隨著重試次數(shù)越來(lái)越多,重試間隔呈指數(shù)級(jí)增長(zhǎng)
        • retryWait對(duì)應(yīng)配置中的high-availability.zookeeper.client.retry-wait,即基礎(chǔ)的間隔時(shí)間
        • maxRetryAttempts對(duì)應(yīng)配置中的high-availability.zookeeper.client.max-retry-attempts,即最大重試次數(shù)
      • rootWithNamespace由root和namespace(clusterId)拼成,root對(duì)應(yīng)配置中的high-availability.zookeeper.path.root,默認(rèn)為/flink, namespace對(duì)應(yīng)配置中的high-availability.cluster-id, 在Yarn模式下也就是applicationId
      • aclProvider默認(rèn)使用DefaultACLProvider,相關(guān)的配置有zookeeper.sasl.disable(默人false)和high-availability.zookeeper.client.acl(默認(rèn)open)
  • Executor是用來(lái)執(zhí)行ZooKeeperCompletedCheckpointStore移除CompletedCheckpoints的任務(wù)的。

  • 在介紹LeaderElectionService和LeaderRetrievalService的ZK實(shí)現(xiàn)之前,先看一個(gè)flink cluster在zookeeper中的目錄結(jié)構(gòu),如下圖


    ZK目錄結(jié)構(gòu)
  • ZookeeperLeaderElectionService

    • 接口

      • ZooKeeperLeaderElectionService除了實(shí)現(xiàn)LeaderElectionService以外,還實(shí)現(xiàn)了LeaderLatchListener,NodeCacheListener,UnhandledErrorListener三個(gè)屬于curator的接口。
      • LeaderLatchListener需要實(shí)現(xiàn)類實(shí)現(xiàn)兩個(gè)回調(diào)方法,如下
        public interface LeaderLatchListener {
            void isLeader();
            void notLeader();
        }
        
        • 當(dāng)被監(jiān)聽(tīng)的對(duì)象(此處即為該ZookeeperLeaderElectionService實(shí)例)被選為leader時(shí),isLeader實(shí)現(xiàn)的邏輯會(huì)被調(diào)用
        • 當(dāng)失去leader位置時(shí),notLeader會(huì)被調(diào)用
      • NodeCacheListener只有一個(gè)方法,如下
        public interface NodeCacheListener {
            void nodeChanged() throws Exception;
        }
        
        • 當(dāng)監(jiān)測(cè)的節(jié)點(diǎn)狀態(tài)發(fā)生變化時(shí),nodeChanged會(huì)被調(diào)用,在此處是保存了LeaderContender地址和LeaderSessionID的節(jié)點(diǎn)
      • UnhandledErrorListener接口需要實(shí)現(xiàn)一個(gè)方法,如下
        public interface UnhandledErrorListener {
            void unhandledError(String var1, Throwable var2);
        }
        
        • 當(dāng)后臺(tái)操作發(fā)生異常時(shí)觸發(fā)unhandledError方法,在flink各個(gè)組件的實(shí)現(xiàn)中也把這當(dāng)做fatal error來(lái)處理
    • 創(chuàng)建

      • ZookeeperLeaderElectionService的創(chuàng)建通過(guò)工具類ZookeeperUtils的createLeaderElectionService方法,如下。
      public static ZooKeeperLeaderElectionService createLeaderElectionService(
              final CuratorFramework client,
              final Configuration configuration,
              final String pathSuffix) {
          final String latchPath = configuration.getString(
              HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
          final String leaderPath = configuration.getString(
              HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
      
          return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
      }
      
      • 該方法接受的參數(shù)其中client(CuratorFramework)的創(chuàng)建上一個(gè)小節(jié)介紹了。還有pathSuffix則對(duì)應(yīng)的是各個(gè)組件,分別如下, 與leader和leaderlatch目錄下一一對(duì)應(yīng)
        private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
        private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
        private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
        private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
        
      • 方法體中的HA_ZOOKEEPER_LATCH_PATH對(duì)應(yīng)flink配置中的high-availability.zookeeper.path.latch,默認(rèn)值為/leaderlatch,HA_ZOOKEEPER_LEADER_PATH對(duì)應(yīng)flink配置中的high-availability.zookeeper.path.leader,默認(rèn)為/leader。此處latchpath與leaderpath就與上圖中flink集群在zk下的目錄一一對(duì)應(yīng)了起來(lái)。
      • 在ZookeeperLeaderElectionService的構(gòu)造方法如下
        public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
            this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
            this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath");
        
            leaderLatch = new LeaderLatch(client, latchPath);
            cache = new NodeCache(client, leaderPath);
        
            issuedLeaderSessionID = null;
            confirmedLeaderSessionID = null;
            leaderContender = null;
        
            running = false;
        }
        
        • 其中LeaderLatch是Curator的針對(duì)某一條zk路徑的leader選舉實(shí)現(xiàn),NodeCache是Curator監(jiān)控某一條zk路徑的變化的實(shí)現(xiàn),在此處只是分別根據(jù)latchpath和leaderpath初始化了對(duì)象,還沒(méi)有啟動(dòng)監(jiān)聽(tīng)。
        • 有兩個(gè)重要的類型為UUID的成員變量被初始化為null,分別是issuedLeaderSessionID和confirmedLeaderSessionID。這兩個(gè)變量在leader選舉過(guò)程中起到非常重要的作用。
    • 啟動(dòng)

      • 在啟動(dòng)LeaderElectionService時(shí),會(huì)將實(shí)現(xiàn)LeaderContender(參與選舉)的實(shí)例傳入,基于zk的方法實(shí)現(xiàn)如下
      public void start(LeaderContender contender) throws Exception {
          Preconditions.checkNotNull(contender, "Contender must not be null.");
          Preconditions.checkState(leaderContender == null, "Contender was already set.");
          LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
          synchronized (lock) {
              client.getUnhandledErrorListenable().addListener(this);
              leaderContender = contender;
              leaderLatch.addListener(this);
              leaderLatch.start();
              cache.getListenable().addListener(this);
              cache.start();
              client.getConnectionStateListenable().addListener(listener);
              running = true;
          }
      }
      
      • 在啟動(dòng)方法中,將當(dāng)前LeaderElection的對(duì)象作為L(zhǎng)istener加入LeaderLatch,NodeCache和CuratorFramework的UnhandleError中,并啟動(dòng)前兩個(gè)服務(wù),并將Running置為true。
    • 過(guò)程

      • 過(guò)程主要包含了被選舉為leader,不再是leader和Cache節(jié)點(diǎn)改變
        • 被選舉為leader,如接口小節(jié)所述,isLeader方法會(huì)被調(diào)用,此時(shí)會(huì)生成一個(gè)UUID作為issuedLeaderSessionID,并作為調(diào)用LeaderContender(參與選舉的組件)的grantLeadership方法的參數(shù)。而LeaderContender則會(huì)通過(guò)confirmedLeaderSessionID來(lái)進(jìn)行確認(rèn),只有與issuedLeaderSessionID相同,confirmedLeaderSessionID才會(huì)更新,并將leader信息寫入對(duì)應(yīng)的leaderPath的節(jié)點(diǎn)中。
        • 不再是leader,如接口小節(jié)所述,notLeader方法會(huì)被調(diào)用,此時(shí)會(huì)將issuedLeaderSessionID和confirmLeaderSessionID置為null,并調(diào)用LeaderContender的revokeLeadership方法通知該組件已經(jīng)失去leader位置。
        • Cache節(jié)點(diǎn)改變時(shí),nodeChanged方法會(huì)被調(diào)用,首先判斷是否為leader,如果是的話則判斷confirmedLeaderSessionID是否為空,如果不為空則將其連同LeaderContender的地址寫入leaderpath下的zk臨時(shí)節(jié)點(diǎn)。
    • 停止

      • 在停止方法中LeaderContender將退出選舉。具體實(shí)現(xiàn)是將啟動(dòng)方法中添加的listener移除并關(guān)閉LeaderLatch和NodeCache,并將成員變量的引用置null。
  • 創(chuàng)建ZookeeperLeaderRetrievalService

    • 接口,ZooKeeperLeaderRetrievalService實(shí)現(xiàn)了LeaderRetrievalService,NodeCacheListener和UnhandledErrorListener接口,這三個(gè)接口在上文都已經(jīng)介紹過(guò)。
    • 創(chuàng)建
      • 因?yàn)長(zhǎng)eaderRetrievalService功能相對(duì)比較簡(jiǎn)單,只需要在leader切換時(shí)獲取相關(guān)組件的Leader的地址和leaderSessionID,所以只創(chuàng)建了NodeCache來(lái)監(jiān)測(cè)retrievalPath的變化(此處retrievalPath與參與選舉的組件的leaderPath)相同,并緩存了lastLeaderAddress和lastLeaderSessionID,防止在leader并沒(méi)有改變的情況下觸發(fā)listener的notifyLeaderAddress。
    • 啟動(dòng)
      • 啟動(dòng)方法將Listener加入U(xiǎn)nhandledError和NodeCache的監(jiān)聽(tīng)并啟動(dòng)NodeCache,在CuratorFramework出錯(cuò)或者監(jiān)測(cè)的retrievalPath節(jié)點(diǎn)發(fā)生變化或能收到回調(diào)。
    • 過(guò)程
      • 當(dāng)監(jiān)測(cè)的retrievalPath發(fā)生變化時(shí),nodeChanged會(huì)被調(diào)用,在該方法體中,會(huì)從這個(gè)NodeCache(zk節(jié)點(diǎn))中獲取數(shù)據(jù),與lastLeaderAddress和lastLeaderSessionID進(jìn)行比對(duì),如果發(fā)生變化會(huì)更新這兩個(gè)變量并調(diào)用Listner的notifyLeaderAddress,通知新的leader地址與leaderSessionID.
    • 停止
      • 在停止方法中中止監(jiān)聽(tīng),具體實(shí)現(xiàn)是將listener移除,并關(guān)閉NodeCache。
  • CheckpointRecoveryFactory

    • CheckpointRecoveryJob 一是提供了根據(jù)JobID和maxNumberOfCheckpointsToRetain(也就是保存的歷史checpkpoint文件的個(gè)數(shù))來(lái)生成CompletedCheckpointStore的方法,二是提供了根據(jù)JobID生成CheckpointIDCounter的方法。在本文中不會(huì)多做介紹,后續(xù)如果寫到失敗恢復(fù)的文章的話會(huì)詳細(xì)介紹。CompletedCheckpointStore本質(zhì)上主要是提供獲取高可用存儲(chǔ)下備份的JobGraph進(jìn)行任務(wù)恢復(fù)的方法。
    • ZookeeperCheckpointRecoveryJob的提供CompletedCheckpointStore的實(shí)現(xiàn)中具體存儲(chǔ)方式是將在高可用文件系統(tǒng)(如HDFS)上保存的Checkpoint文件的地址存儲(chǔ)在/flink/cluster_id/checkpoints/路徑下。其中ZK的路徑由配置中的high-availability.zookeeper.path.checkpoints參數(shù)來(lái)制定,文件系統(tǒng)上存儲(chǔ)的路徑由配置中的high-availability.storageDir指定。
    • ZooKeeperCheckpointRecoveryFactory中提供CheckpointIDCounter是通過(guò)Curator的SharedCount來(lái)實(shí)現(xiàn)的,是一個(gè)高可用的計(jì)數(shù)器,路徑由配置中high-availability.zookeeper.path.checkpoint-counter來(lái)指定,默認(rèn)是/checkpoint-counter
  • SubmittedJobGraphStore

    • SubmittedJobGraphStore提供了將JobGraph高可用文件系統(tǒng)上的保存和移除功能,以及根據(jù) JobID獲取所要恢復(fù)的任務(wù)的JobGraph功能。但是在zk的目錄和hdfs上的目錄下我都沒(méi)有找到相應(yīng)的文件,這邊先略過(guò),有機(jī)會(huì)補(bǔ)上。
  • RunningJobsRegistry

    • ZK實(shí)現(xiàn)RunningJobsRegistry負(fù)責(zé)在ZK節(jié)點(diǎn)上登記所有集群中運(yùn)行的Job的狀態(tài),三種狀態(tài)分別為RUNNING,PENDING和FINISHED。ZK上的路徑可以通過(guò)high-availability.zookeeper.path.running-registry來(lái)指定。

總結(jié)

  • 本文簡(jiǎn)單的介紹了一下Flink高可用服務(wù)的功能,和基于ZK的典型實(shí)現(xiàn)。其中SubmittedJobGraphStore部分的實(shí)際運(yùn)行不符合我的預(yù)期,后續(xù)有機(jī)會(huì)更正。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容