數(shù)據(jù)本地化背景
數(shù)據(jù)本地化對(duì)于Spark Job性能有著巨大的影響。如果數(shù)據(jù)以及要計(jì)算它的代碼是在一起的,那么性能當(dāng)然會(huì)非常高。但是,如果數(shù)據(jù)和計(jì)算它的代碼是分開的,那么其中之一必須到另外一方的機(jī)器上。通常來說,移動(dòng)代碼到其他節(jié)點(diǎn),會(huì)比移動(dòng)數(shù)據(jù)到代碼所在的節(jié)點(diǎn)上去,速度要快得多,因?yàn)榇a比較小。Spark也正是基于這個(gè)數(shù)據(jù)本地化的原則來構(gòu)建task調(diào)度算法的。
數(shù)據(jù)本地化,指的是,數(shù)據(jù)離計(jì)算它的代碼有多近?;跀?shù)據(jù)距離代碼的距離,有幾種數(shù)據(jù)本地化級(jí)別:
- PROCESS_LOCAL:數(shù)據(jù)和計(jì)算它的代碼在同一個(gè)JVM進(jìn)程中。
- NODE_LOCAL:數(shù)據(jù)和計(jì)算它的代碼在一個(gè)節(jié)點(diǎn)上,但是不在一個(gè)進(jìn)程中,比如在不同的executor進(jìn)程中,或者是數(shù)據(jù)在HDFS文件的block中。
- NO_PREF:數(shù)據(jù)從哪里過來,性能都是一樣的。
- RACK_LOCAL:數(shù)據(jù)和計(jì)算它的代碼在一個(gè)機(jī)架上。
- ANY:數(shù)據(jù)可能在任意地方,比如其他網(wǎng)絡(luò)環(huán)境內(nèi),或者其他機(jī)架上。
原理

Task要處理的partition的數(shù)據(jù),在某一個(gè)Executor中,TaskScheduler首先會(huì)盡量用最好的本地化級(jí)別去啟動(dòng)task,也就是說,會(huì)盡量在哪個(gè)包含了要處理的partition的executor中,去啟動(dòng)task
此時(shí),Executor已經(jīng)再執(zhí)行好幾個(gè)task了,沒有空閑資源來執(zhí)行這個(gè)task
默認(rèn)情況下,spark會(huì)等待一會(huì),等待Executor什么時(shí)候可以空閑出一個(gè)cpu core,從而來啟動(dòng)這個(gè)task,讓它實(shí)現(xiàn)最好的本地化級(jí)別
但是如果等待了一會(huì)(時(shí)間是可以調(diào)優(yōu)的,通過參數(shù)設(shè)置),發(fā)現(xiàn)始終沒有等到Executor的core釋放,那么會(huì)放大一個(gè)級(jí)別,去嘗試啟動(dòng)這個(gè)task
如果這個(gè)rdd之前持久化過,task會(huì)去調(diào)用RDD的iterator()方法,然后通過executor關(guān)聯(lián)的BlockManager,來嘗試獲取數(shù)據(jù),BlockManager底層,首先嘗試從getLocal()在本地找數(shù)據(jù),如果沒有找到的話,那么用getRemote(),通過BlockTransferService,鏈接到有數(shù)據(jù)的BlockManager,來獲取數(shù)據(jù)
如果沒有持久化過,那么就computerOrReadCheckpoint()
如果還是不能啟動(dòng),繼續(xù)放大級(jí)別
數(shù)據(jù)本地化優(yōu)化
Spark傾向于使用最好的本地化級(jí)別來調(diào)度task,但是這是不可能的。如果沒有任何未處理的數(shù)據(jù)在空閑的executor上,那么Spark就會(huì)放低本地化級(jí)別。這時(shí)有兩個(gè)選擇:第一,等待,直到executor上的cpu釋放出來,那么就分配task過去;第二,立即在任意一個(gè)executor上啟動(dòng)一個(gè)task。
Spark默認(rèn)會(huì)等待一會(huì)兒,來期望task要處理的數(shù)據(jù)所在的節(jié)點(diǎn)上的executor空閑出一個(gè)cpu,從而將task分配過去。只要超過了時(shí)間,那么Spark就會(huì)將task分配到其他任意一個(gè)空閑的executor上。
可以設(shè)置參數(shù),spark.locality系列參數(shù),來調(diào)節(jié)Spark等待task可以進(jìn)行數(shù)據(jù)本地化的時(shí)間。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。