1 spark on yarn常用屬性介紹
| 屬性名 | 默認(rèn)值 | 屬性說明 |
|---|---|---|
spark.yarn.am.memory |
512m | 在客戶端模式(client mode)下,yarn應(yīng)用master使用的內(nèi)存數(shù)。在集群模式(cluster mode)下,使用spark.driver.memory代替。 |
spark.driver.cores |
1 | 在集群模式(cluster mode)下,driver程序使用的核數(shù)。在集群模式(cluster mode)下,driver程序和master運行在同一個jvm中,所以master控制這個核數(shù)。在客戶端模式(client mode)下,使用spark.yarn.am.cores控制master使用的核。 |
spark.yarn.am.cores |
1 | 在客戶端模式(client mode)下,yarn應(yīng)用的master使用的核數(shù)。在集群模式下,使用spark.driver.cores代替。 |
spark.yarn.am.waitTime |
100ms | 在集群模式(cluster mode)下,yarn應(yīng)用master等待SparkContext初始化的時間。在客戶端模式(client mode)下,master等待driver連接到它的時間。 |
spark.yarn.submit.file.replication |
3 | 文件上傳到hdfs上去的replication次數(shù) |
spark.yarn.preserve.staging.files |
false |
設(shè)置為true時,在job結(jié)束時,保留staged文件;否則刪掉這些文件。 |
spark.yarn.scheduler.heartbeat.interval-ms |
3000 |
Spark應(yīng)用master與yarn resourcemanager之間的心跳間隔 |
spark.yarn.scheduler.initial-allocation.interval |
200ms | 當(dāng)存在掛起的容器分配請求時,spark應(yīng)用master發(fā)送心跳給resourcemanager的間隔時間。它的大小不能大于spark.yarn.scheduler.heartbeat.interval-ms,如果掛起的請求還存在,那么這個時間加倍,直到到達spark.yarn.scheduler.heartbeat.interval-ms大小。 |
spark.yarn.max.executor.failures |
numExecutors * 2,并且不小于3 |
在失敗應(yīng)用程序之前,executor失敗的最大次數(shù)。 |
spark.executor.instances |
2 |
Executors的個數(shù)。這個配置和spark.dynamicAllocation.enabled不兼容。當(dāng)同時配置這兩個配置時,動態(tài)分配關(guān)閉,spark.executor.instances被使用 |
spark.yarn.executor.memoryOverhead |
executorMemory * 0.10,并且不小于384m
|
每個executor分配的堆外內(nèi)存。 |
spark.yarn.driver.memoryOverhead |
driverMemory * 0.10,并且不小于384m
|
在集群模式下,每個driver分配的堆外內(nèi)存。 |
spark.yarn.am.memoryOverhead |
AM memory * 0.10,并且不小于384m
|
在客戶端模式下,每個driver分配的堆外內(nèi)存 |
spark.yarn.am.port |
隨機 |
Yarn 應(yīng)用master監(jiān)聽的端口。 |
spark.yarn.queue |
default |
應(yīng)用提交的yarn隊列的名稱 |
spark.yarn.jar |
none |
Jar文件存放的地方。默認(rèn)情況下,spark jar安裝在本地,但是jar也可以放在hdfs上,其他機器也可以共享。 |
2 客戶端模式和集群模式的區(qū)別
這里我們要區(qū)分一下什么是客戶端模式(client mode),什么是集群模式(cluster mode)。
我們知道,當(dāng)在YARN上運行Spark作業(yè)時,每個Spark executor作為一個YARN容器(container)運行。Spark可以使得多個Tasks在同一個容器(container)里面運行。 yarn-cluster和yarn-client模式的區(qū)別其實就是Application Master進程的區(qū)別,在yarn-cluster模式下,driver運行在AM(Application Master)中,它負(fù)責(zé)向YARN申請資源,并監(jiān)督作業(yè)的運行狀況。當(dāng)用戶提交了作業(yè)之后,就可以關(guān)掉Client,作業(yè)會繼續(xù)在YARN上運行。然而yarn-cluster模式不適合運行交互類型的作業(yè)。 在yarn-client模式下,Application Master僅僅向YARN請求executor,client會和請求的container通信來調(diào)度他們工作,也就是說Client不能離開。下面的圖形象表示了兩者的區(qū)別。


2.1 Spark on YARN集群模式分析
2.1.1 客戶端操作
- 1、根據(jù)
yarnConf來初始化yarnClient,并啟動yarnClient; - 2、創(chuàng)建客戶端
Application,并獲取Application的ID,進一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException; - 3、設(shè)置資源、環(huán)境變量:其中包括了設(shè)置
Application的Staging目錄、準(zhǔn)備本地資源(jar文件、log4j.properties)、設(shè)置Application其中的環(huán)境變量、創(chuàng)建Container啟動的Context等; - 4、設(shè)置
Application提交的Context,包括設(shè)置應(yīng)用的名字、隊列、AM的申請的Container、標(biāo)記該作業(yè)的類型為Spark; - 5、申請
Memory,并最終通過yarnClient.submitApplication向ResourceManager提交該Application。
當(dāng)作業(yè)提交到YARN上之后,客戶端就沒事了,甚至在終端關(guān)掉那個進程也沒事,因為整個作業(yè)運行在YARN集群上進行,運行的結(jié)果將會保存到HDFS或者日志中。
2.1.2 提交到Y(jié)ARN集群,YARN操作
- 1、運行
ApplicationMaster的run方法; - 2、設(shè)置好相關(guān)的環(huán)境變量。
- 3、創(chuàng)建
amClient,并啟動; - 4、在
Spark UI啟動之前設(shè)置Spark UI的AmIpFilter; - 5、在
startUserClass函數(shù)專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext; - 6、等待
SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(shù)(默認(rèn)為10),如果等待了的次數(shù)超過了配置的,程序?qū)顺?;否則用SparkContext初始化yarnAllocator; - 7、當(dāng)
SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster; - 8、分配并啟動
Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然后在Container中啟動Executeors。 如果在啟動Executeors的過程中失敗的次數(shù)達到了maxNumExecutorFailures的次數(shù),maxNumExecutorFailures的計算規(guī)則如下:
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
那么這個Application將失敗,將Application Status標(biāo)明為FAILED,并將關(guān)閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現(xiàn)的,而ExecutorRunnable內(nèi)部是啟動CoarseGrainedExecutorBackend的。
- 9、最后,
Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業(yè)運行完成。
2.2 Spark on YARN客戶端模式分析
和yarn-cluster模式一樣,整個程序也是通過spark-submit腳本提交的。但是yarn-client作業(yè)程序的運行不需要通過Client類來封裝啟動,而是直接通過反射機制調(diào)用作業(yè)的main函數(shù)。下面是流程。
- 1、通過
SparkSubmit類的launch的函數(shù)直接調(diào)用作業(yè)的main函數(shù)(通過反射機制實現(xiàn)),如果是集群模式就會調(diào)用Client的main函數(shù)。 - 2、而應(yīng)用程序的
main函數(shù)一定都有個SparkContent,并對其進行初始化; - 3、在
SparkContent初始化中將會依次做如下的事情:設(shè)置相關(guān)的配置、注冊MapOutputTracker、BlockManagerMaster、BlockManager,創(chuàng)建taskScheduler和dagScheduler; - 4、初始化完
taskScheduler后,將創(chuàng)建dagScheduler,然后通過taskScheduler.start()啟動taskScheduler,而在taskScheduler啟動的過程中也會調(diào)用SchedulerBackend的start方法。 在SchedulerBackend啟動的過程中將會初始化一些參數(shù),封裝在ClientArguments中,并將封裝好的ClientArguments傳進Client類中,并client.runApp()方法獲取Application ID。 - 5、
client.runApp里面的做的和上章客戶端進行操作那節(jié)類似,不同的是在里面啟動是ExecutorLauncher(yarn-cluster模式啟動的是ApplicationMaster)。 - 6、在
ExecutorLauncher里面會初始化并啟動amClient,然后向ApplicationMaster注冊該Application。注冊完之后將會等待driver的啟動,當(dāng)driver啟動完之后,會創(chuàng)建一個MonitorActor對象用于和CoarseGrainedSchedulerBackend進行通信(只有事件AddWebUIFilter他們之間才通信,Task的運行狀況不是通過它和CoarseGrainedSchedulerBackend通信的)。 然后就是設(shè)置addAmIpFilter,當(dāng)作業(yè)完成的時候,ExecutorLauncher將通過amClient設(shè)置Application的狀態(tài)為FinalApplicationStatus.SUCCEEDED。 - 7、分配
Executors,這里面的分配邏輯和yarn-cluster里面類似。 - 8、最后,
Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業(yè)運行完成。 - 9、在作業(yè)運行的時候,
YarnClientSchedulerBackend會每隔1秒通過client獲取到作業(yè)的運行狀況,并打印出相應(yīng)的運行信息,當(dāng)Application的狀態(tài)是FINISHED、FAILED和KILLED中的一種,那么程序?qū)⑼顺龅却?/li> - 10、最后有個線程會再次確認(rèn)
Application的狀態(tài),當(dāng)Application的狀態(tài)是FINISHED、FAILED和KILLED中的一種,程序就運行完成,并停止SparkContext。整個過程就結(jié)束了。
3 spark submit 和 spark shell參數(shù)介紹
| 參數(shù)名 | 格式 | 參數(shù)說明 |
|---|---|---|
| --master | MASTER_URL | 如spark://host:port |
| --deploy-mode | DEPLOY_MODE | Client或者master,默認(rèn)是client |
| --class | CLASS_NAME | 應(yīng)用程序的主類 |
| --name | NAME | 應(yīng)用程序的名稱 |
| --jars | JARS | 逗號分隔的本地jar包,包含在driver和executor的classpath下 |
| --packages | 包含在driver和executor的classpath下的jar包逗號分隔的”groupId:artifactId:version”列表 | |
| --exclude-packages | 用逗號分隔的”groupId:artifactId”列表 | |
| --repositories | 逗號分隔的遠程倉庫 | |
| --py-files | PY_FILES | 逗號分隔的”.zip”,”.egg”或者“.py”文件,這些文件放在python app的PYTHONPATH下面 |
| --files | FILES | 逗號分隔的文件,這些文件放在每個executor的工作目錄下面 |
| --conf | PROP=VALUE | 固定的spark配置屬性 |
| --properties-file | FILE | 加載額外屬性的文件 |
| --driver-memory | MEM | Driver內(nèi)存,默認(rèn)1G |
| --driver-java-options | 傳給driver的額外的Java選項 | |
| --driver-library-path | 傳給driver的額外的庫路徑 | |
| --driver-class-path | 傳給driver的額外的類路徑 | |
| --executor-memory | MEM | 每個executor的內(nèi)存,默認(rèn)是1G |
| --proxy-user | NAME | 模擬提交應(yīng)用程序的用戶 |
| --driver-cores | NUM | Driver的核數(shù),默認(rèn)是1。這個參數(shù)僅僅在standalone集群deploy模式下使用 |
| --supervise | Driver失敗時,重啟driver。在mesos或者standalone下使用 | |
| --verbose | 打印debug信息 | |
| --total-executor-cores | NUM | 所有executor總共的核數(shù)。僅僅在mesos或者standalone下使用 |
| --executor-core | NUM | 每個executor的核數(shù)。在yarn或者standalone下使用 |
| --driver-cores | NUM | Driver的核數(shù),默認(rèn)是1。在yarn集群模式下使用 |
| --queue | QUEUE_NAME | 隊列名稱。在yarn下使用 |
| --num-executors | NUM | 啟動的executor數(shù)量。默認(rèn)為2。在yarn下使用 |
你可以通過spark-submit --help或者spark-shell --help來查看這些參數(shù)。
參考文獻
【1】Spark:Yarn-cluster和Yarn-client區(qū)別與聯(lián)系