Flink on Yarn模式啟動(dòng)流程源代碼分析

Flink on yarn的啟動(dòng)流程可以參見(jiàn)前面的文章 Flink on Yarn啟動(dòng)流程,下面主要是從源碼角度看下這個(gè)實(shí)現(xiàn),可能有的地方理解有誤,請(qǐng)給予指正,多謝。

--> 1.命令行啟動(dòng)yarn session

bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
我們?nèi)タ聪聠?dòng)腳本

  $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR" $log_setting  org.apache.flink.yarn.cli.FlinkYarnSessionCli  -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

主要是用java -cp的方式啟動(dòng)主類(lèi)** *org.apache.flink.yarn.cli.FlinkYarnSessionCli * $@ 就是我們傳入的哪些參數(shù) " -n 3 -jm 1024 -nm 1024 -st" **。

1. FlinkYarnSessionCli 的啟動(dòng)流程分析

首先看下Main函數(shù)

public static void main(String[] args) {   
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
   System.exit(cli.run(args));
}

主要是構(gòu)造FlinkYarnSessionCli,然后執(zhí)行其run方法,這里主要介紹主要流程的代碼。

public int run(String[] args)

  • 1.解析命令行參數(shù)

    cmd = parser.parse(options, args)
    
  • 2.根據(jù)命令行參數(shù)決定執(zhí)行那種模式。

     # 第一種,判斷命令是否包含 -q
    


** 如: **

# 第二種,判斷是否有-id參數(shù)

這里我們看下交互模式是啥樣的,共有兩個(gè)可選項(xiàng),help和stop,如果我們敲入stop,則應(yīng)用對(duì)應(yīng)的所有進(jìn)程會(huì)退出。

# 第三種,為正常模式


** 這里主要為構(gòu)造YarnClusterDescriptor,然后調(diào)用其deploy方法啟動(dòng)集群 ,接著將Jobmanager和web ui地址寫(xiě)入到out文件中去,如果采用分離模式,則等待集群?jiǎn)?dòng)之后yarn session自動(dòng)退出,如果不是則進(jìn)入交互模式,我們可以通過(guò)交互控制這個(gè)Applitcation **

接著看下是如何構(gòu)造YarnClusterDescriptor的

----------------- **1 creat YarnClusterDescriptor ** ----------------------

直接new YarnClusterDescriptor對(duì)象,然后將依賴(lài)jar地址,配置參數(shù)如taskmanager個(gè)數(shù),jar地址,配置文件地址,配置參數(shù)等設(shè)置到Y(jié)arnClusterDescriptor對(duì)象中去,然后返回這個(gè)對(duì)象。

------------** 2 YarnClusterDescriptor deploy ** -------------------------

由于YarnClusterDescriptor沒(méi)有重寫(xiě)depoy方法則直接調(diào)用其父類(lèi)AbstractYarnClusterDescriptor的deploy方法,但是最終調(diào)用的是其deployInternal方法.

接著看下deployInternal方法,簡(jiǎn)單的描述下流程,后續(xù)代碼分析下面的github地址

  • 檢查是否具備Deploy的條件,如配置文件,jar路徑是否為空
  • 獲取yarn的client,用戶(hù)和RM進(jìn)行通信
  • 增加動(dòng)態(tài)的配置屬性到配置conf對(duì)象中去,解析配置conf對(duì)象為kv對(duì)
  • 獲取HDFS FileSyetem,這里用于將本地jar及配置文件上傳到HDFS,
  • 判斷JobManager和TaskManager申請(qǐng)的資源是否滿足yarn分配單個(gè)container的最小分配,如果小于則將container最小分配用來(lái)初始化jobMananger和TaskMananer
  • 通過(guò)yarn client創(chuàng)建Application,返回GetNewApplicationResponse對(duì)象用于跟RM進(jìn)行RPC通信。
  • 通過(guò)GetNewApplicationResponse對(duì)象獲取RM能夠?yàn)檫@個(gè)應(yīng)用分配的最大資源,如果最大資源不能夠滿足jobManagerMemoryMb和taskManagerMemoryMb則報(bào)錯(cuò),計(jì)算總的jobmanager和所有taskmanager總共需要的資源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),計(jì)算RM中總共的空閑資源,判斷空閑資源是否滿足前面計(jì)算需要的需求,如果不滿足,則可能先啟動(dòng)yarn session,task manager等到有資源再進(jìn)行啟動(dòng);先為jobManager分配一個(gè)nm,然后再在其他的nm上啟動(dòng)taskmanager
  • 設(shè)置啟動(dòng)ApplicationMaster的 lanchcontext,這里主要是設(shè)置java home,主類(lèi),jvm參數(shù)數(shù),log文件配置。ApplicationMaster的主類(lèi) YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。
protected Class<?> getApplicationMasterClass() {   
return YarnApplicationMasterRunner.class;
}```
- 設(shè)置ApplicationSubmissionContext,獲取ApplicationId
- 設(shè)置session需要的hdfs路徑,然后將本地jar包及配置文件,配置文件上傳到HDFS
- 設(shè)置AM啟動(dòng)的token信息,設(shè)置AM啟動(dòng)的過(guò)程中需要從hdfs下載那些依賴(lài)的jar和配置文件,設(shè)置ApplicationMaster及Flink及其他進(jìn)程的classpath,不多說(shuō)
- 設(shè)置鉤子函數(shù)在deploy的時(shí)候清理上傳到hdfs的文件及本地下載的依賴(lài)文件
- *** 重點(diǎn),提交Applicaiton到RM;設(shè)置這個(gè)Application的狀態(tài)為NEW,然后監(jiān)控這個(gè)應(yīng)用,如果不是之前的NEW狀態(tài),則打印當(dāng)前狀態(tài),如果Running狀態(tài)則跳出這個(gè)循環(huán),如果是其他狀態(tài),則拋出YarnDeploymentException異常,上層調(diào)用捕獲處理吧,不然250ms判斷一次 ***
- depoly成功,鉤子函數(shù)刪除臨時(shí)文件,如依賴(lài)的jar包和配置文件等,返回YarnClusterClient對(duì)象,包含了這YarnClusterDescriptor,ApplicationReport等重要的屬性。
***
***deploy 成功以后進(jìn)入交互模式,在runInteractiveCli里面最重要的一步是構(gòu)造ApplicationClient Actor用于和JobManager Actor進(jìn)行通信,但是如果發(fā)送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等消息,將會(huì)由jobmanager actor將forward方法路由到flink resource manager actor去處理,此時(shí)jobmanager作為flink resource manager的代理,此時(shí)收到這兩個(gè)消息的時(shí)候,由于是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接給application client返回消息***
***
> ------------ ** 3 代碼展示主要流程**------
![](http://upload-images.jianshu.io/upload_images/3249301-d22456f0939a8365.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-9c80aa18467d4e10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-85a2f462ff96e5fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-a3c81e3dc9b23db0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-bf190e6a72366f0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-57eb01f090d38dd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-d548d544dbd1b713.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-2013feca33032c46.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-d0d8c8c1a56f28ff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-be3a228edeffad9d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
*** ---- ApplicationClient 和JobManager Actor通信代碼 --***

![](http://upload-images.jianshu.io/upload_images/3249301-56371ec18930ba4f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-ed28091d44dc3906.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-1a0df11e1a57941d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-913d82bf6d5825b8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6309a49886d0cc4e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-47f86bcfdee4f967.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6a483d4af26931cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-4de478f435cb1356.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-27986c3659bd96cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-a27f89acbe3406de.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-f04a3da4f97a08dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

## 2. YarnApplicationMasterRunner 啟動(dòng)流程分析
*** RM首先分配一個(gè)NM的container去啟動(dòng)YarnApplicationMasterRunner ,接著下來(lái)我們看下是怎么做的***
首先是進(jìn)入main函數(shù)里面,構(gòu)造一個(gè)YarnApplicationMasterRunner對(duì)象,直接調(diào)用其Run方法。
> run方法主要步驟
- 獲取當(dāng)前用戶(hù)的UGI及遠(yuǎn)端UGI
- 將當(dāng)前用戶(hù)ugi里面的token傳遞到遠(yuǎn)端的UGI中,用于數(shù)據(jù)和服務(wù)訪問(wèn)
- 在遠(yuǎn)端的UGI里面執(zhí)行runApplicationMaster啟動(dòng)ApplicationMaster
![](http://upload-images.jianshu.io/upload_images/3249301-7d3ac2af1bf091f6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

> runApplicationMaster主要過(guò)程,這里注釋很清楚,我只撿重要的提示下
- 1) load and parse / validate all configurations
- 2) start the actor system,try to start the actor system, JobManager and JobManager actor system
- 3) Generate the configuration for the TaskManagers,這里主要是JobManager的地址,taskManager注冊(cè)的超時(shí)時(shí)間,slot個(gè)數(shù),這里還有最重要的一步是構(gòu)造TaskManager的ContainerLaunchContext,這個(gè)context里面包含了啟動(dòng)TaskManager的啟動(dòng)命令,***主類(lèi)是YarnTaskManager***。
-  start the actors and components in this order:
  1) JobManager & Archive (in non-HA case, the leader service takes this),啟動(dòng)JobManagerActor,這里主類(lèi)是***YarnJobManager***
  2) Web Monitor (we need its port to register) 啟動(dòng)WEB監(jiān)控頁(yè)面,創(chuàng)建LeaderRetrievalService對(duì)象,這個(gè)主要用于啟動(dòng)TaskManager的時(shí)候,告訴TaskManager JobManager得akka url,用于TaskManager向JobManager進(jìn)行注冊(cè)。
  3) Resource Master for YARN   啟動(dòng)YarnFlinkResourceManager Actor,這里主要用于Flink container資源的管理包括申請(qǐng)與釋放等。
  4) Process reapers for the JobManager and Resource Master
***這里主要介紹YarnApplicationMasterRunner 是如何通過(guò)YarnFlinkResourceManager去完成container的申請(qǐng)與啟動(dòng)TaskManager的,這里相對(duì)來(lái)說(shuō),比較復(fù)雜,我跟到Y(jié)arn的代碼里才算整明白***

![YarnFlinkResourceManager的繼承關(guān)系](http://upload-images.jianshu.io/upload_images/3249301-cbc8215f8c356913.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
說(shuō)明YarnFlinkResourceManager其實(shí)是一個(gè)actor,在runApplicationMaster方法中,通過(guò)下面的代碼啟動(dòng)這個(gè)Actor

Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),//YarnFlinkResourceManager
config,
yarnConfig,
leaderRetriever,
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//啟動(dòng)YarnFlinkResourceManager actor

接著看下YarnFlinkResourceManager 的構(gòu)造方法,這里主要有三個(gè)成員變量比較重要

//在yarn 的rm端會(huì)調(diào)用該回對(duì)象的回調(diào)函數(shù)進(jìn)行container申請(qǐng),resourceManagerCallbackHandler里面只有該actor的actor ref,所以回調(diào)的過(guò)程中能夠與該actor進(jìn)行通信
/** Callback handler for the asynchronous resourceManagerClient /
private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
//AM與RM通信的client,resourceManagerClient對(duì)象持有resourceManagerCallbackHandler
/
* Client to communicate with the Resource Manager (YARN's master) /
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
//AM與NM的通信client
/
* Client to communicate with the Node manager and launch TaskManager processes */
private NMClient nodeManagerClient;

YarnFlinkResourceManager 啟動(dòng)的過(guò)程先執(zhí)行preStart方法,自己沒(méi)有實(shí)現(xiàn)則執(zhí)行其父類(lèi)FlinkResourceManager的preStart方法。接著調(diào)用YarnFlinkResourceManager 的initialize方法。
> ***在initialize方法里面***
*** resourceManagerClient.start() ----> 
  AMRMClientAsyncImpl.serviceStart()--->
  CallbackHandlerThread.start()(守護(hù)線程)--->
YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster為YarnFlinkResourceManager ActorRef) -->
YarnFlinkResourceManager .containersAllocated -->
NMClient.startContainer(container, taskManagerLaunchContext)
至此通知各個(gè)NM啟動(dòng)container。***
![](http://upload-images.jianshu.io/upload_images/3249301-3a5b6377e93742b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-2c532578dc7a508d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6a2cd3067970fe39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-e14cfee9289c57bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-2e3b73f252030ea8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-a3239382a6bde777.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-0a52a463c263bbd7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-52cecffb2670cc67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

***至此,YarnApplicationMasterRunner 重要的流程已經(jīng)說(shuō)完,細(xì)節(jié)東西太多,就不再說(shuō)了,有時(shí)間再看,接下來(lái)看YarnTaskManager的部分***

## 3. YarnTaskManager啟動(dòng)流程分析

***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)將通知NM去啟動(dòng)container,NM根據(jù)taskManagerLaunchContext的啟動(dòng)信息,從HDFS下載YarnTaskManager啟動(dòng)過(guò)程依賴(lài)的jar和配置文件
(container_tokens  default_container_executor_session.sh  default_container_executor.sh  flink-conf.yaml  flink.jar  launch_container.sh  lib  log4j.properties  logback.xml),然后shell執(zhí)行l(wèi)aunch_container.sh,最終用java -cp啟動(dòng)YarnTaskManager進(jìn)程,啟動(dòng)進(jìn)程的時(shí)候首先執(zhí)行YarnTaskManager run方法,TaskManager會(huì)拿到JobManager的akka地址,然后發(fā)送注冊(cè)消息,JobManager收到注冊(cè)消息以后,注冊(cè)成功之后就發(fā)送ack確認(rèn)注冊(cè)信息給TaskManager,然后TaskManger根據(jù)配置以及JobManager返回過(guò)來(lái)的信息構(gòu)建一些真正干活的成員變量。過(guò)程:***
> 
YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])-->
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)-->
TaskManager.runTaskManager -->
TaskManager.startTaskManagerComponentsAndActor-->
actorSystem.actorOf(tmProps, actorName)-->
TaskManager.preStart-->
StandaloneLeaderRetrievalService.start(TaskManager)-->
TaskManger.notifyLeaderAddress-->
TaskManager.handleJobManagerLeaderAddress-->
TaskManager.triggerTaskManagerRegistration()
TaskManager.handleRegistrationMessage-->
instanceManager.registerTaskManager-->
jobManager 發(fā)送消息AcknowledgeRegistration給TaskManager
TaskManager.associateWithJobManager-->



![](http://upload-images.jianshu.io/upload_images/3249301-8797abe4fead540e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-6ada492f0e25718b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-7d9e8a6893af056a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-e4116544f6c7a677.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-03e551a6ebf785c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-155d80ccd8b5bed4.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-40bbe88a86f090c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-2226948e829a5add.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/3249301-b9c444d706737b6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-3ff9f4e9243c1264.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-08823728261241ec.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![associateWithJobManager](http://upload-images.jianshu.io/upload_images/3249301-f3aa8048869975c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-1bb10a80436aa199.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](http://upload-images.jianshu.io/upload_images/3249301-442effae29613f2c.png)




###基本上Flink on yarn的流程就是這樣,細(xì)節(jié)需要深入,有不正確的地方,希望給予指正。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容