轉(zhuǎn)行寫(xiě)spark程序快一年時(shí)間了,我最深刻的體會(huì)是實(shí)現(xiàn)功能容易,但如何提高程序的執(zhí)行效率卻是個(gè)難題。我們用的spark主要是spark sql框架,使用spark sql實(shí)現(xiàn)數(shù)據(jù)的清洗、抽取以及計(jì)算。期間,我們用了大部分的時(shí)間對(duì)程序做優(yōu)化,現(xiàn)將對(duì)程序的優(yōu)化方法總結(jié)如下:
1. 數(shù)據(jù)存儲(chǔ)優(yōu)化
在數(shù)據(jù)存儲(chǔ)上,經(jīng)過(guò)了從hdfs切換到cassandra,再?gòu)腸assandra換到內(nèi)存文件系統(tǒng)alluxio上。
1.1 hadfs存儲(chǔ)
hdfs是hadoop的分布式文件系統(tǒng),被設(shè)計(jì)成適合運(yùn)行在通用硬件上的分布式文件系統(tǒng)。能夠存儲(chǔ)多種格式的數(shù)據(jù),包括文本及parquet格式等等。它是一個(gè)主從結(jié)構(gòu),一個(gè)hdfs集群是由一個(gè)名字節(jié)點(diǎn),它是一個(gè)管理文件命名空間和調(diào)節(jié)客戶(hù)端訪(fǎng)問(wèn)文件的主服務(wù)器,當(dāng)然還有一些數(shù)據(jù)節(jié)點(diǎn),通常是一個(gè)節(jié)點(diǎn)一個(gè)機(jī)器,它來(lái)管理對(duì)應(yīng)節(jié)點(diǎn)的存儲(chǔ),如下圖1所示。hdfs對(duì)外開(kāi)放文件命名空間并允許用戶(hù)數(shù)據(jù)以文件形式存儲(chǔ)。

hdfs內(nèi)部是將一個(gè)文件分割成一個(gè)或多個(gè)塊,這些塊被存儲(chǔ)在一組數(shù)據(jù)節(jié)點(diǎn)中。名字節(jié)點(diǎn)用來(lái)操作文件命名空間的文件或目錄操作,如打開(kāi),關(guān)閉,重命名等等。它同時(shí)確定塊與數(shù)據(jù)節(jié)點(diǎn)的映射。數(shù)據(jù)節(jié)點(diǎn)負(fù)責(zé)來(lái)自文件系統(tǒng)客戶(hù)的讀寫(xiě)請(qǐng)求。數(shù)據(jù)節(jié)點(diǎn)同時(shí)還要執(zhí)行塊的創(chuàng)建,刪除,和來(lái)自名字節(jié)點(diǎn)的塊復(fù)制指令。
1.2 cassandra存儲(chǔ)
cassandra是一個(gè)面向帶索引的Nosql數(shù)據(jù)庫(kù),數(shù)據(jù)是以松散結(jié)構(gòu)的多維哈希表存儲(chǔ)在數(shù)據(jù)庫(kù)中。它最大的優(yōu)點(diǎn)就是寫(xiě)速度非??欤⑶矣捎谟兴饕?,可以避免重復(fù)數(shù)據(jù)的出現(xiàn),有自動(dòng)去重的功能,而讀的速度卻不那么盡如人意。
1.3 alluxio+parquet存儲(chǔ)
由于使用hdfs和cassandra加載數(shù)據(jù)時(shí),數(shù)據(jù)的加載時(shí)間就占用了很大部分,甚至占用了一半的時(shí)間。基于數(shù)據(jù)加載速度的考慮,后來(lái)采用alluxio存儲(chǔ)需要加載的數(shù)據(jù),使用hdfs作為持久層,這樣數(shù)據(jù)可以直接從內(nèi)存加載,加載的速度大大提高了。
現(xiàn)在的spark程序主要使用spark sql框架,程序結(jié)構(gòu)如下圖2所示。spark sql程序運(yùn)行在yarn集群上,直接訪(fǎng)問(wèn)alluxio內(nèi)存文件系統(tǒng)加載數(shù)據(jù),持久化的數(shù)據(jù)存儲(chǔ)在hdfs防止丟失。alluxio作為一個(gè)內(nèi)存的文件系統(tǒng),也可以存放多種類(lèi)型的格式的文件,為了節(jié)省存儲(chǔ)開(kāi)銷(xiāo),提高訪(fǎng)問(wèn)速度,此處我們?cè)赼lluxio上使用parquet格式存儲(chǔ)數(shù)據(jù)。

2. 邏輯結(jié)構(gòu)優(yōu)化
2.1 數(shù)據(jù)加載邏輯優(yōu)化
由于程序計(jì)算需要加載的歷史數(shù)據(jù)較多,而程序的每輪執(zhí)行都要加載歷史數(shù)據(jù),歷史數(shù)據(jù)的加載占用了數(shù)據(jù)加載的大部分時(shí)間。因此我們后來(lái)改用數(shù)據(jù)增量加載的形式,即將數(shù)據(jù)分天存放。每次程序重啟時(shí),先加載歷史數(shù)據(jù),即當(dāng)天以前的數(shù)據(jù),將歷史數(shù)據(jù)加載后進(jìn)行預(yù)處理后,保存在內(nèi)存中。以后每輪就只需加載當(dāng)天數(shù)據(jù),再跟歷史數(shù)據(jù)合并。這樣以后每輪都避免了重新加載歷史數(shù)據(jù),并進(jìn)行預(yù)處理的時(shí)間,顯著地提高了程序的執(zhí)行效率。
2.2 數(shù)據(jù)分區(qū)合并
我們使用alluxio,以parquet的格式存儲(chǔ)源數(shù)據(jù)時(shí),如果小文件過(guò)多,數(shù)據(jù)的加載速度會(huì)慢很多。因此在存儲(chǔ)源數(shù)據(jù)時(shí),建議使用coalesce將數(shù)據(jù)合并到幾個(gè)分區(qū)中,以防止小文件過(guò)多。雖然reparation也能夠?qū)崿F(xiàn)分區(qū)聚合功能,但reparation會(huì)出現(xiàn)shuffle,嚴(yán)重影響spark程序的執(zhí)行效率。
3. 配置參數(shù)優(yōu)化
3.1 任務(wù)調(diào)度模式修改
spark的任務(wù)調(diào)度模式分為FIFO(先進(jìn)先出)和FAIR(公平競(jìng)爭(zhēng))。FIFO的調(diào)度機(jī)制是將隊(duì)列中的job按照先進(jìn)先出的方式進(jìn)行調(diào)度執(zhí)行,而FAIR則按照是使需要資源較少的任務(wù)先執(zhí)行,如所有任務(wù)都得按先進(jìn)先出的方式,則小作業(yè)也被阻塞不能執(zhí)行。設(shè)置調(diào)度模式的方式,只需一行代碼如下:
sparkConf.set(“spark.scheduler.mode”,”FAIR”)
3.2 執(zhí)行并行度參數(shù)設(shè)置
并行度決定了spark作業(yè)劃分task的數(shù)量,一般情況下,task越多,程序執(zhí)行的并行度就越高。但task數(shù)量也不能太多,因?yàn)閠ask的創(chuàng)建也需要耗時(shí)間和內(nèi)存資源。一般建議設(shè)置的并行度是num-executorsexecutor-cores的2-3倍。如果不設(shè)置,程序默認(rèn)的并行度只是num-executorsexecutor-cores的不到2倍。
4.shuffle調(diào)優(yōu)
4.1 定義廣播變量
大多數(shù)spark作業(yè)的性能主要就是消耗在了shuffle環(huán)節(jié),因?yàn)樵摥h(huán)節(jié)包含了大量的磁盤(pán)IO、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮鳌R虼?,如果要讓作業(yè)的性能更上一層樓,就有必要對(duì)shuffle過(guò)程進(jìn)行調(diào)優(yōu)。shuffle調(diào)優(yōu)的一個(gè)途徑就是盡可能地減少shuffle,在兩個(gè)表join的過(guò)程中,如果一個(gè)表不會(huì)經(jīng)常改變,同時(shí)數(shù)據(jù)量又不會(huì)太大時(shí),將這個(gè)表廣播出去,這樣集群上的每個(gè)節(jié)點(diǎn)上都會(huì)保存這個(gè)表,這樣需要join操作的另一個(gè)表就可以在自己的節(jié)點(diǎn)上完成關(guān)聯(lián)操作,可以盡可能地減少shuffle。
4.2 減少數(shù)據(jù)傾斜
在進(jìn)行shuffle的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的key拉取到某個(gè)節(jié)點(diǎn)上的一個(gè)task來(lái)進(jìn)行處理,比如按照key進(jìn)行聚合或join等操作。此時(shí)如果某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別大的話(huà),就會(huì)發(fā)生數(shù)據(jù)傾斜。比如大部分key對(duì)應(yīng)10條數(shù)據(jù),但是個(gè)別key卻對(duì)應(yīng)了100萬(wàn)條數(shù)據(jù),那么大部分task可能就只會(huì)分配到10條數(shù)據(jù),然后1秒鐘就運(yùn)行完了;但是個(gè)別task可能分配到了100萬(wàn)數(shù)據(jù),要運(yùn)行一兩個(gè)小時(shí)。因此,整個(gè)Spark作業(yè)的運(yùn)行進(jìn)度是由運(yùn)行時(shí)間最長(zhǎng)的那個(gè)task決定的。
因此出現(xiàn)數(shù)據(jù)傾斜的時(shí)候,spark作業(yè)運(yùn)行得非常緩慢,甚至可能因?yàn)槟硞€(gè)task處理的數(shù)據(jù)量過(guò)大導(dǎo)致內(nèi)存溢出。數(shù)據(jù)傾斜的例子如下圖所示,在三個(gè)節(jié)點(diǎn)上對(duì)應(yīng)的以hello為key的總共有6條數(shù)據(jù),這些數(shù)據(jù)都會(huì)被拉取到同一個(gè)task中進(jìn)行處理,而以world為key的只有3條。實(shí)際情況中task可能個(gè)別key的數(shù)據(jù)量可能更大,比key少的可能多n多倍。因此key多的task的運(yùn)行速度可能會(huì)比key少的task執(zhí)行速度要慢n倍,而整個(gè)程序的執(zhí)行速度是由最慢的task決定的。同時(shí),如果某一個(gè)task處理的數(shù)據(jù)量過(guò)多的話(huà),還會(huì)出現(xiàn)內(nèi)存溢出的危險(xiǎn)。

實(shí)際寫(xiě)spark程序時(shí),要盡量避免出現(xiàn)數(shù)據(jù)傾斜的情況,如果出現(xiàn)上述現(xiàn)象時(shí),原則是采用兩階段聚合的方式。首先針對(duì)某些key較多的數(shù)據(jù),進(jìn)行拆分,將一個(gè)key拆分成多個(gè),比如上圖3,對(duì)hello進(jìn)行拆分,將hello前面加上1-10之間的任意隨機(jī)數(shù),變成1_hello,2_hello直到10_hello,對(duì)這些拆分后的key首先聚合,聚合后變成(1_hello,2),(2_hello,2)在再將這些隨機(jī)數(shù)前綴去掉,再進(jìn)行聚合,實(shí)現(xiàn)方式如下圖4。

5. 升級(jí)spark版本
還有一種較為有效的辦法是升級(jí)spark版本,spark 2.1版本較spark 1.6版本性能有更多的提升。其中的優(yōu)勢(shì)在《spark sql執(zhí)行流程》的最后有所介紹。