當(dāng)我們使用spark-submit提交一個(gè)Spark任務(wù)后,這個(gè)任務(wù)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程,然后根據(jù)你設(shè)置的部署模式(deploy-mode)的不同,Driver進(jìn)程可能在本地啟動(dòng),也可能在集群的某個(gè)工作節(jié)點(diǎn)上啟動(dòng)。Driver進(jìn)程本身會(huì)根據(jù)我們?cè)O(shè)置的參數(shù),占用一定的內(nèi)存、CPU和core。當(dāng)Driver進(jìn)程啟動(dòng)后,它首先會(huì)向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,美團(tuán)?大眾點(diǎn)評(píng)使用的是YARN作為資源管理集群)申請(qǐng)運(yùn)行Spark任務(wù)所需要的資源,即Executor進(jìn)程,yarn集群管理器會(huì)根據(jù)我們提交Spark任務(wù)時(shí)設(shè)置的資源參數(shù),在各個(gè)工作節(jié)點(diǎn)上啟動(dòng)一定數(shù)量的Executor進(jìn)程,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存、CPU和core。
在申請(qǐng)到任務(wù)執(zhí)行所需的資源后,Driver進(jìn)程就開(kāi)始調(diào)度和執(zhí)行我們提交的代碼了,Driver進(jìn)程會(huì)將我們提交的Spark代碼拆分成多個(gè)stage,每個(gè)stage執(zhí)行代碼的一部分片段,并為每個(gè)stage分配一批task,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行。task是最小的計(jì)算單元,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(即我們自己編寫(xiě)的某個(gè)代碼片段),只是每個(gè)task中處理的數(shù)據(jù)不同而已,一個(gè)stage中的所有task任務(wù)執(zhí)行完成后,會(huì)在各個(gè)節(jié)點(diǎn)的本地磁盤(pán)中文件中寫(xiě)入中間的計(jì)算結(jié)果,然后Driver就會(huì)調(diào)度執(zhí)行下一個(gè)stage,下一個(gè)stage的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果,如此循環(huán)往復(fù),直到將我們自己編寫(xiě)的代碼邏輯全部執(zhí)行完,并且計(jì)算完所有的數(shù)據(jù),得到我們想要的結(jié)果為止。
Spark是根據(jù)shuffle類算子來(lái)進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個(gè)shuffle類算子(比如reduceByKey、join等),那么就會(huì)在該算子處,劃分出一個(gè)stage界限來(lái)??梢源笾吕斫鉃?,shuffle算子執(zhí)行之前的代碼會(huì)被劃分為一個(gè)stage,shuffle算子執(zhí)行以及之后的代碼會(huì)被劃分為下一個(gè)stage。因此一個(gè)stage剛開(kāi)始執(zhí)行的時(shí)候,它的每個(gè)task可能都會(huì)從上一個(gè)stage的task所在的節(jié)點(diǎn),去通過(guò)網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key,然后對(duì)拉取到的所有相同的key使用我們自己編寫(xiě)的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))。這個(gè)過(guò)程就是shuffle。
當(dāng)我們?cè)诖a中執(zhí)行了cache/persist等持久化操作時(shí),根據(jù)我們選擇的持久化級(jí)別的不同,每個(gè)task計(jì)算出來(lái)的數(shù)據(jù)也會(huì)保存到Executor進(jìn)程的內(nèi)存或者所在節(jié)點(diǎn)的磁盤(pán)文件中。
因此Executor的內(nèi)存主要分為三塊:第一塊是讓task執(zhí)行我們自己編寫(xiě)的代碼時(shí)使用,默認(rèn)是占Executor總內(nèi)存的20%;第二塊是讓task通過(guò)shuffle過(guò)程拉取了上一個(gè)stage的task的輸出后,進(jìn)行聚合等操作時(shí)使用,默認(rèn)也是占Executor總內(nèi)存的20%;第三塊是讓RDD持久化時(shí)使用,默認(rèn)占Executor總內(nèi)存的60%。
task的執(zhí)行速度是跟每個(gè)Executor進(jìn)程的CPU core數(shù)量有直接關(guān)系的。一個(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)線程。而每個(gè)Executor進(jìn)程上分配到的多個(gè)task,都是以每個(gè)task一條線程的方式,多線程并發(fā)運(yùn)行的。如果CPU core數(shù)量比較充足,而且分配到的task數(shù)量比較合理,那么通常來(lái)說(shuō),可以比較快速和高效地執(zhí)行完這些task線程。