Flink源碼1-Flink 的集群和Jobmanager啟動(dòng)

1、Flink RPC 詳解 0:5:00 ~ 0:26:00

1、ActorSystem 是管理 Actor生命周期的組件, Actor是負(fù)責(zé)進(jìn)行通信的組
2、每個(gè) Actor 都有一個(gè) MailBox,別的 Actor 發(fā)送給它的消息都首先儲存在 MailBox 中,通過這種 方式可以實(shí)現(xiàn)異步通信。
3、每個(gè) Actor 是單線程的處理方式,不斷的從 MailBox 拉取消息執(zhí)行處理,所以對于 Actor 的消息處 理,不適合調(diào)用會阻塞的處理方法。
4、Actor 可以改變他自身的狀態(tài),可以接收消息,也可以發(fā)送消息,還可以生成新的 Actor
5、每一個(gè)ActorSystem 和 Actor都在啟動(dòng)的時(shí)候會給定一個(gè) name,如果要從ActorSystem中,獲取一 個(gè) Actor,則通過以下的方式來進(jìn)行 Actor的獲?。?akka.tcp://asname@bigdata02:9527/user/actorname 6、如果一個(gè) Actor 要和另外一個(gè) Actor進(jìn)行通信,則必須先獲取對方 Actor 的 ActorRef 對象,然 后通過該對象發(fā)送消息即可。
7、通過 tell 發(fā)送異步消息,不接收響應(yīng),通過 ask 發(fā)送異步消息,得到 Future 返回,通過異步回到 返回處理結(jié)果

Flink 中的 RPC 實(shí)現(xiàn)主要在 flink-runtime 模塊下的 org.apache.flink.runtime.rpc 包中,涉及
到的最重要的 API 主要是以下這四個(gè):

1、RpcGateway 路由,RPC的老祖宗,各種其他RPC組件,都是 RpcGateWay 的子類
2、RpcServer RpcService 和 RpcEndpoint 之間的粘合層
3、RpcEndpoint 業(yè)務(wù)邏輯載體,對應(yīng)的 Actor 的封裝
4、RpcService 對應(yīng) ActorSystem 的封裝

image.png

RpcEndpoint 下面有四個(gè)比較重要的子類:
1、TaskExecutor 2、Dispatcher 3、JobMaster 4、ResourceManager

創(chuàng)建成功了之后,都會要去執(zhí)
行他的 onStart() ,在集群啟動(dòng)的源碼分析中,其實(shí)這些組件的很多的工作流程,都被放在 onStart() 里
面。

2、Flink 集群啟動(dòng)腳本分析 0: 26:00 ~ 0:37:00

flink-dist 子項(xiàng)目中,位于 flink-bin 下的 bin 目錄:啟動(dòng)腳本為:start?cluster.sh
會首先調(diào)用 config.sh 來獲取 masters 和 workers,masters 的信息,是從 conf/masters 配置
文件中獲取的, workers 是從 conf/workers 配置文件中獲取的。然后分別:
1、通過 jobmanager.sh 來啟動(dòng) JobManager
2、通過 taskmanager.sh 來啟動(dòng) TaskManager

內(nèi)部,都通過 flink-daemon.sh 腳本來啟動(dòng) JVM 進(jìn)程,分析 flink-daemon.sh 腳本發(fā)現(xiàn):

1、JobManager 的啟動(dòng)代號:standalonesession,實(shí)現(xiàn)類是: StandaloneSessionClusterEntrypoint
2、TaskManager 的啟動(dòng)代號:taskexecutor,實(shí)現(xiàn)類是:TaskManagerRunner

3、Flink 主節(jié)點(diǎn) JobManager 啟動(dòng)分析 0:37:00 ~ 3:16:00

1、ResourceManager Flink的集群資源管理器,只有一個(gè),關(guān)于slot的管理和申請等工作,都由他負(fù)責(zé) 2、Dispatcher 負(fù)責(zé)接收用戶提交的 JobGragh, 然后啟動(dòng)一個(gè) JobManager, 類似于 YARN 集群中的 AppMaster 角色,類似于 Spark Job 中的 Driver 角色
3、JobManager 負(fù)責(zé)一個(gè)具體的 Job 的執(zhí)行,在一個(gè)集群中,可能會有多個(gè) JobManager 同時(shí)執(zhí)行,類似于 YARN 集群中的 AppMaster 角色,類似于 Spark Job 中的 Driver 角色
4、WebMonitorEndpoint 里面維護(hù)了很多很多的Handler,如果客戶端通過 flink run 的方式來提交一個(gè) job 到 flink 集群,最終,是由 WebMonitorEndpoint 來接收,并且決定使用哪一個(gè) Handler 來執(zhí)行處理 submitJob ===> SubmitJobHandler

JobManager的啟動(dòng)主類:

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

image.png

-------------------源碼開始 1、 webMonitor 啟動(dòng) ---------------------
StandaloneSessionClusterEntrypoint#main()

/** 注釋:創(chuàng)建 StandaloneSessionClusterEntrypoint
*/
StandaloneSessionClusterEntrypoint entrypoint = new
StandaloneSessionClusterEntrypoint(configuration);

    /*************************************************
      注釋:啟動(dòng)集群的entrypoint
     *  這個(gè)方法接受的是父類 ClusterEntrypoint,可想而知其他幾種啟動(dòng)方式也是通過這個(gè)方法。
     */
    ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        ——》ClusterEntrypoint#runClusterEntrypoint
                          clusterEntrypoint.startCluster();
            ——》ClusterEntrypoint#startCluster
                                             ▼
                        *  注釋: 根據(jù)配置初始化文件系統(tǒng)
         *  三種東西;
         *  1、本地    Local  客戶端的時(shí)候會用  JobGragh ===> JobGraghFile
         *  2、HDFS      FileSytem(DistributedFileSystem)
         *  3、封裝對象      HadoopFileSystem, 里面包裝了 HDFS 的 FileSYSTEM 實(shí)例對象
         */
        configureFileSystems(configuration, pluginManager);
                    *  注釋: 集群啟動(dòng)入口
             */
            runCluster(configuration, pluginManager);
                 ——》ClusterEntrypoint#runCluster
                                            ▼
                      /* 1  注釋 初始化各類服務(wù)線程池
                   initializeServices(configuration, pluginManager);
                     
                   /* 2  初始化 4大 工廠實(shí)例             
         *   DispatcherRunnerFactory = DefaultDispatcherRunnerFactory
         *  ResourceManagerFactory = StandaloneResourceManagerFactory
         *   RestEndpointFactory(WenMonitorEndpoint的工廠) = SessionRestEndpointFactory
         *  返回值:DefaultDispatcherResourceManagerComponentFactory
         *  內(nèi)部包含了這三個(gè)工廠實(shí)例,就是三個(gè)成員變量
         *  再補(bǔ)充一個(gè):dispatcherLeaderProcessFactoryFactory = SessionDispatcherLeaderProcessFactoryFactory
         */
        final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory =             createDispatcherResourceManagerComponentFactory(configuration);

clusterComponent = dispatcherResourceManagerComponentFactory.create(

***1重點(diǎn)方法 DefaultDispatcherResourceManagerComponentFactory#create

(163行) webMonitorEndpoint.start(); //

------------------- 2 、Resourcemanager啟動(dòng)部分 2:14:00 ----------------

167行)resourceManager = resourceManagerFactory.createResourceManager

ResourceManagerFactory#createResourceManager

StandaloneResourceManagerFactory#createResourceManager

ResourceManager#構(gòu)造函數(shù)
當(dāng)執(zhí)行完畢這個(gè)構(gòu)造方法的時(shí)候,會觸發(fā)調(diào)用 onStart() 方法執(zhí)行

FencedRpcEndpoint#構(gòu)造函數(shù)

RpcEndpoint#構(gòu)造函數(shù)

this.rpcServer = rpcService.startServer(this);

AkkaRpcService#startServer
.......▲▲(回到ResourceManager類Onstart)
ResourceManager#Onstart() —— 2:24:00
——》ResourceManager#startResourceManagerServices

// TODO_MA 注釋: 注意這個(gè) this 對象
// TODO_MA 注釋: 執(zhí)行選舉,成功之后,調(diào)用 leaderElectionService.isLeader()
// TODO_MA 注釋: this = ResourceManager
leaderElectionService.start(this);

ZooKeeperLeaderElectionService#start

/*** 注釋: Fink 的 選舉,和 HBase 一樣都是通過 ZooKeeper 的 API 框架 Curator 實(shí)現(xiàn)的
* 1、leaderLatch.start(); 事實(shí)上就是舉行選舉
* 2、當(dāng)選舉結(jié)束的時(shí)候:
* 如果成功了: isLeader()
* 如果失敗了: notLeader()
/
leaderLatch.addListener(this);
leaderLatch.start();
——》ZooKeeperLeaderElectionService#isLeader()

/
* 注釋: 分配 LeaderShip
* leaderContender = JobManagerRunnerImpl
* leaderContender = ResourceManager
* leaderContender = DefaultDispatcherRunner
* leaderContender = WebMonitorEndpoint
*
* leaderElectionService.start(this);
* leaderContender = this
/
leaderContender.grantLeadership(issuedLeaderSessionID);

ResourceManager#grantLeadership
(ignored) -> tryAcceptLeadership
↓ 實(shí)現(xiàn)類為 ResourceManager
——》ResourceManager# tryAcceptLeadership

/
注釋: 啟動(dòng)服務(wù)
* 1、啟動(dòng)心跳服務(wù) 啟動(dòng)兩個(gè)定時(shí)任務(wù)
* 2、啟動(dòng) SlotManager 服務(wù) 啟動(dòng)兩個(gè)定時(shí)任務(wù)
/
startServicesOnLeadership();

/

* 注釋: 開啟心跳服務(wù)
*/
startHeartbeatServices();

                                          /*   注釋: 啟動(dòng) SlotManagerImpl
                                              *  這個(gè)里面只是開啟了兩個(gè)定時(shí)任務(wù)而已
                                              */
                         slotManager.start(getFencingToken(), getMainThreadExecutor(), new  
     .                                 ......▲▲▲▲(回到DefaultDispatcherResourceManagerComponentFactory)

------------------- 3、 Dispatcher啟動(dòng)部分 2:47 :00 ----------------
回到 : DefaultDispatcherResourceManagerComponentFactory#create 方法

dispatcherRunner = dispatcherRunnerFactory .createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,// TODO_MA 注釋: 注意第三個(gè)參數(shù)
new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);

DefaultDispatcherRunnerFactory#createDispatcherRunner
return DefaultDispatcherRunner.create
——》DefaultDispatcherRunner#create
return DispatcherRunnerLeaderElectionLifecycleManager.createFor
——》DispatcherRunnerLeaderElectionLifecycleManager#createFor
return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);
——》DispatcherRunnerLeaderElectionLifecycleManager構(gòu)造函數(shù)
/*** * 又開啟了選舉 ,同上resourcemanager相同
* 這個(gè)選舉服務(wù)對象 leaderElectionService 內(nèi)部的 leaderContender 是 :
** DefaultDispatcherRunner
leaderElectionService.start(dispatcherRunner);

——》同上節(jié)一樣啟動(dòng) ZooKeeperLeaderElectionService#isLeader()
leaderContender.grantLeadership(issuedLeaderSessionID);
↓ 實(shí)現(xiàn)類為 DefaultDispatcherRunner
DefaultDispatcherRunner#grantLeadership
* 注釋: 開啟 Dispatcher 服務(wù)
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
——》DefaultDispatcherRunner#startNewDispatcherLeaderProcess
thenRun(newDispatcherLeaderProcess::start))

AbstractDispatcherLeaderProcess#start
——》AbstractDispatcherLeaderProcess#startInternal
onStart()

SessionDispatcherLeaderProcess#onStart()

/* 注釋: 開啟服務(wù): 啟動(dòng) JobGraghStore
* 一個(gè)用來存儲 JobGragh 的存儲組件
/
startServices();
// TODO_MA 注釋: 到現(xiàn)在為止,依然還沒有啟動(dòng) Dispatcher
.thenAccept(this::createDispatcherIfRunning)
——》SessionDispatcherLeaderProcess#createDispatcherIfRunning
——》 SessionDispatcherLeaderProcess#createDispatcher
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(

DefaultDispatcherGatewayServiceFactory#create

/
注釋: 創(chuàng)建 Dispatcher
dispatcher = dispatcherFactory.createDispatcher(rpcService, fencingToken, bootstrap,

SessionDispatcherFactory#createDispatcher
return new StandaloneDispatcher(rpcService, fencingToken, dispatcherBootstrap,

StandaloneDispatcher構(gòu)造函數(shù)
↓ StandaloneDispatcher為RPCendpoint類
Dispatcher 類 Onstart方法

* 注釋: 啟動(dòng) Dispatcher 服務(wù)
*/
startDispatcherServices();
* 注釋: 引導(dǎo)程序初始化
* 把所有中斷的 job 恢復(fù)執(zhí)行
*/
dispatcherBootstrap.initialize(this,

DefaultDispatcherBootstrap#initialize
launchRecoveredJobGraphs(dispatcher, recoveredJobs);

AbstractDispatcherBootstrap#launchRecoveredJobGraphs
* 注釋: 恢復(fù)執(zhí)行 待恢復(fù)的 Job
*/
dispatcher.runRecoveredJob(recoveredJob);

* 注釋: 調(diào)用 runJob 運(yùn)行一個(gè)任務(wù)
FutureUtils.assertNoException(runJob(recoveredJob).handle
——》Dispatcher#runJob
......▲▲▲▲(回到DefaultDispatcherResourceManagerComponentFactory)結(jié)束 3:17 :00

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容