漫談Spark數(shù)據(jù)平臺(tái)下的應(yīng)用及編程

Spark 正在成為下一代大數(shù)據(jù)分析和處理的平臺(tái),這種趨勢(shì)越來(lái)越明顯。

但是,就在一年以前,筆者曾做過(guò)多方面的咨詢(xún)和了解,那時(shí)Spark 還沒(méi)有大規(guī)模地在各大企業(yè)應(yīng)用起來(lái),即使當(dāng)時(shí)某些企業(yè)已經(jīng)運(yùn)用Spark,大多卻還只是在開(kāi)始局部性的嘗試,其主要業(yè)務(wù)和數(shù)據(jù)仍然運(yùn)行在Hadoop 數(shù)據(jù)平臺(tái)上。

說(shuō)起來(lái),筆者對(duì)Spark平臺(tái)的應(yīng)用經(jīng)歷也是從一年前才開(kāi)始,從嘗試到最后熟練的應(yīng)用,也算是經(jīng)歷了不少波折。當(dāng)時(shí)自己安裝部署的是1.4.0版本,SparkR 也是從那個(gè)版本正式納入成為Spark 一部分的,由于需要做金融大數(shù)據(jù)分析,很自然的專(zhuān)門(mén)選擇了SparkR 。但是接下來(lái)的經(jīng)歷令人印象非常深刻,SparkR當(dāng)時(shí)在項(xiàng)目中根本沒(méi)法用,即使最基本的,它的很多API在集群中運(yùn)行的結(jié)果是錯(cuò)誤的,甚至是隨機(jī)的(當(dāng)時(shí)的測(cè)試,在本地多線程模擬集群環(huán)境下,結(jié)果是正確的)。

一開(kāi)始遇到有API錯(cuò)誤,經(jīng)常在深更半夜不斷地去檢測(cè)是否是人為錯(cuò)誤,確定不是人為錯(cuò)誤之后,筆者會(huì)想各種辦法去繞開(kāi)使用錯(cuò)誤的API ,還曾向開(kāi)發(fā)Spark 的伯克利大學(xué)的教授多次發(fā)郵件尋求幫助。隨后又發(fā)現(xiàn)接連好幾個(gè)其它API錯(cuò)誤,于是果斷暫時(shí)放棄了SparkR ,轉(zhuǎn)而使用Spark 的其它部分。

最后選擇的是基于Java 語(yǔ)言的Spark 應(yīng)用(RDD,SQL,ML和Hive),之所以選擇Java而不是Scala ,主要原因在于,那時(shí)并不保證Spark的其它部分沒(méi)有Bug 而可以順利運(yùn)用到項(xiàng)目里去,并且,在項(xiàng)目上線時(shí)間已很緊迫的情況下,傳統(tǒng)面向?qū)ο笳Z(yǔ)言Java遠(yuǎn)比Scala好把握(開(kāi)發(fā)進(jìn)度和調(diào)試)。

本文內(nèi)容主要講的是基于Java 的Spark 應(yīng)用及編程,來(lái)自于個(gè)人實(shí)戰(zhàn)和學(xué)習(xí)的一部分要點(diǎn)提煉和總結(jié)。

其實(shí),即使到現(xiàn)在,整個(gè)網(wǎng)絡(luò)上和實(shí)體店都找不到一本比較好的關(guān)于Spark應(yīng)用的中文或英文書(shū)籍,同時(shí)也沒(méi)有來(lái)自外界同行比較成熟的應(yīng)用經(jīng)驗(yàn)可供借鑒(同行經(jīng)驗(yàn)方面現(xiàn)在可能稍好一些)。關(guān)于Spark 的應(yīng)用,官方文檔是其唯一可靠資料。

但是,個(gè)人以為,官方文檔也存在一些問(wèn)題,比如Spark 的相關(guān)內(nèi)容太多,但是官網(wǎng)里對(duì)一些內(nèi)容描述實(shí)在不甚清楚,有些內(nèi)容的描述甚至沒(méi)有任何提及。因此,個(gè)人摸索便成了辦法之一。

關(guān)于編程

Hadoop 的map-reduce 計(jì)算框架高度的精煉和抽象,是Hadoop 平臺(tái)的重要基礎(chǔ)之一。

但是map-reduce框架也有比較大的弊端:

一方面,Hadoop 的shuffle 和過(guò)多的磁盤(pán)IO導(dǎo)致map-reduce 框架運(yùn)行速度太慢;

另一方面,map-reduce 固有的框架使得很多原本并不適合運(yùn)用它來(lái)進(jìn)行編程的問(wèn)題,特別是一些要求反復(fù)迭代的作業(yè)。

于是,不斷的嘗試轉(zhuǎn)換作業(yè)形式和過(guò)程以使它方便套用map-reduce 框架,成了某種需要。對(duì)此,Spark 在很多方面做了長(zhǎng)足的改進(jìn),比如Spark 可充分的基于內(nèi)存操作,有向無(wú)環(huán)圖DAG 的方式執(zhí)行作業(yè),shuffle 的改進(jìn),作業(yè)失敗后從前面某一步而不是第一步開(kāi)始重新執(zhí)行作業(yè)等等,讓Spark 的運(yùn)行速度相對(duì)Hadoop 有幾十倍甚至百倍的提升。

因此,相對(duì)Hadoop 的離線計(jì)算,Spark 是近實(shí)時(shí)的。同時(shí),Spark的算子很多且非常豐富,遠(yuǎn)遠(yuǎn)不止Hadoop 只有單純的map , reduce 和combine 若干幾個(gè),這使得Spark RDD 可以很方便的運(yùn)用到各種場(chǎng)景的應(yīng)用作業(yè)里面去。

關(guān)于Spark Java 的RDD 編程,其實(shí)也挺意外,有不少寫(xiě)法,看起來(lái)挺正常的,官網(wǎng)沒(méi)有特別說(shuō)明,編譯也沒(méi)有任何錯(cuò)誤,運(yùn)行時(shí)就是報(bào)異?;蝈e(cuò)誤,甚至錯(cuò)誤日志看起來(lái)也和錯(cuò)誤代碼處不搭界。

以下給出部分Spark Java編程的若干要點(diǎn):

RDD 的行數(shù)有限制,最大行數(shù)大概是200萬(wàn)行多一些,不然RDD 會(huì)溢出。在某些場(chǎng)景下,大行數(shù)RDD 用的上;

RDD 包含transformation 和action 兩種算子,但是,一個(gè)RDD 連續(xù)的transformation 算子個(gè)數(shù)有限制,不然調(diào)用堆棧會(huì)溢出。解決辦法可以在中間某個(gè)transformation 算子之前執(zhí)行一次checkpoint ,或者插入某個(gè)簡(jiǎn)單的action 算子,比如first 或take ;

派生算子類(lèi)Function 系列,似乎必須和static 扯上,即Function 派生類(lèi)必須定義成static函數(shù)的內(nèi)部類(lèi)或匿名類(lèi),要么必須定義成函數(shù)外部的static類(lèi),不然,運(yùn)行時(shí)會(huì)報(bào)錯(cuò);

HiveContext 和SQLContext 不可混用,也就是一個(gè)類(lèi)對(duì)象如果是用HiveContext 創(chuàng)建的,那么它不可用SQLContext來(lái)執(zhí)行SQL操作,反之亦然。這應(yīng)該和兩個(gè)Context 的底層實(shí)現(xiàn)有關(guān)系;

一個(gè)類(lèi)對(duì)象將用于Hive或SQL操作,該類(lèi)必須是public static 的,且實(shí)現(xiàn)Serializable ,該類(lèi)各個(gè)成員變量必須有set/get 方法(應(yīng)該用到了類(lèi)Java的反射機(jī)制,其實(shí)set如果不需要也可以不用寫(xiě)),并且,get方法的函數(shù)名有要求,比如一個(gè)類(lèi)有成員變量名為abcd , 則其對(duì)應(yīng)函數(shù)名必須為getAbcd ,不然,運(yùn)行時(shí)會(huì)報(bào)找不到該表列名錯(cuò)誤;

如需按行操作RDD ,可考慮用zipWithIndex 。

關(guān)于運(yùn)行錯(cuò)誤調(diào)試和作業(yè)調(diào)優(yōu)

1. cache 有時(shí)候需要多用,有時(shí)候卻又需要盡量少用,collect 某些時(shí)候要慎用,因?yàn)閏ollect 涉及到將各個(gè)node 的所有數(shù)據(jù)都通過(guò)網(wǎng)絡(luò)傳輸?shù)絛rive ,所以,大RDD 的collect 會(huì)非常耗時(shí),不是必須的情況下,可考慮用take;

2. 擅于用mapPartitions ,某種程度上它可以替代filter ,并且在某些場(chǎng)景下mapPartitions 比map 效率高,要注意mapPartitions 對(duì)只有空返回分區(qū)的特殊處理;

3. Broadcast 和傳參,是解決外部變量和Function 系列派生類(lèi)打交道的兩個(gè)方法,對(duì)于大的常量應(yīng)用Broadcast ,可提高效率。但是,外部變量有時(shí)難以用于Broadcast ,這時(shí)候可給Function 系列派生類(lèi)定義一個(gè)成員變量,在調(diào)用的時(shí)候創(chuàng)建并將外部參數(shù)傳遞給它;

4. Spark 集群有standalone ,mesos 和yarn 三種安裝模式,本地多線程也可模擬集群模式。經(jīng)常在Spark 社區(qū)看到有人問(wèn)關(guān)于Spark 的一些難題,有時(shí)卻是由于不同安裝模式導(dǎo)致的,這一點(diǎn)要注意了,yarn 據(jù)說(shuō)是最有前景的方式,由此也共享了Hadoop 的框架,個(gè)人喜歡用yarn 模式;

5. groupBy, reduceBy和aggregateBy比較

一個(gè)RDD包括多個(gè)partition (分區(qū)),其所有partition 可能分布在集群不同的node 上,同一個(gè)partition只能在某一個(gè)node 上。RDD 的分布式和并行性,即是partition 層面的。假如RDD的某一個(gè)action算子需要同時(shí)讀取來(lái)自不同分區(qū)的一些數(shù)據(jù),這時(shí)候就需要通過(guò)網(wǎng)絡(luò)IO或磁盤(pán)IO將位于不同分區(qū)的那些數(shù)據(jù)匯集到一處以作下一步的操作,這就是Spark 的shuffle (混洗)。由于shuffle操作涉及到網(wǎng)絡(luò)IO 和磁盤(pán)IO ,所以shuffle 的操作總是很耗時(shí)的,于是,盡量減少shuffle的次數(shù)和傳輸?shù)臄?shù)據(jù)量,是提高Spark作業(yè)運(yùn)行效率的一個(gè)方法。

能用reduceBy 的時(shí)候盡量不用groupBy ,reduceBy 需做兩次合并操作,一次在shuffle 之前,一次在之后,經(jīng)歷過(guò)之前一次合并操作,合并后的中間數(shù)據(jù)量會(huì)大大減少,從而減少shuffle過(guò)程中IO的時(shí)間,提高了運(yùn)行效率。groupBy 只有shuffle 之后一次合并操作,所以在它shuffle 過(guò)程需傳輸?shù)臄?shù)據(jù)量大而影響了效率。

當(dāng)Pair RDD 指定列的輸入和輸出類(lèi)型不一致的時(shí)候,應(yīng)不要用reduceBy ,而考慮用aggregateBy 。比如,需要找到Pair RDD各個(gè)key的所有不同的value ,如用reduceBy,可考慮輸出類(lèi)型為set,于是,每一次都需要為新的set 動(dòng)態(tài)分配內(nèi)存空間,效率很受影響。aggregateBy 在map 端做聚合操作會(huì)更高效。

關(guān)于reduceByKey 和groupByKey 的圖解示意圖如下:

6. out of memory (OOM)問(wèn)題若干解決辦法

內(nèi)存溢出OOM 可以說(shuō)是運(yùn)行Spark 作業(yè)時(shí)經(jīng)常遇到的問(wèn)題,特別是對(duì)運(yùn)行時(shí)間比較長(zhǎng)的大作業(yè),可能同一個(gè)應(yīng)用程序運(yùn)行在較少量數(shù)據(jù)上一直表現(xiàn)良好,一旦運(yùn)行在大數(shù)據(jù)集上問(wèn)題立馬出來(lái)了。導(dǎo)致OOM 的原因比較多,有些還很復(fù)雜,于是解決方法也不盡相同。個(gè)人感覺(jué),解決OOM 問(wèn)題的過(guò)程,有時(shí)某種程度上也是對(duì)Spark作業(yè)進(jìn)行優(yōu)化的過(guò)程。

導(dǎo)致OOM 的原因之一,可能是作業(yè)內(nèi)部action 過(guò)程比較多,需要不斷將集群上的數(shù)據(jù)下到drive 上來(lái),如當(dāng)時(shí)drive 上內(nèi)存不夠,很可能就產(chǎn)生OOM 了。當(dāng)時(shí)drive上空間不夠,可能是系統(tǒng)本身分配給該作業(yè)的drive 內(nèi)存資源就不夠,也可能是作業(yè)前面階段消耗的內(nèi)存太多導(dǎo)致后來(lái)的階段不夠用了,還可能是action (比如shuffle )內(nèi)部過(guò)程本就有較多內(nèi)存需求但又給予其分配的空間不夠。

以下是對(duì)于解決OOM的若干方法:

設(shè)置spark.executor.memory 盡可能大,可為接近集群?jiǎn)蝹€(gè)node 的所有內(nèi)存

用更多的partition ,其數(shù)可等同于CPU數(shù),適當(dāng)分區(qū)數(shù)目的設(shè)置對(duì)作業(yè)運(yùn)行效率及穩(wěn)定性影響挺大

設(shè)置spark.storage.memory.fraction 更小,默認(rèn)其值為0.6,是專(zhuān)預(yù)留給cache 用的,剩余內(nèi)存給作業(yè)其它部分用。盡量少用cache,經(jīng)驗(yàn)看來(lái)也有挺大幫助

如OOM 發(fā)生于shuffle 過(guò)程,則應(yīng)增加shuffle 階段的內(nèi)存數(shù)量,設(shè)置spark.shuffle.fraction 為更大值

對(duì)大對(duì)象多用Broadcast

可能是內(nèi)存泄漏導(dǎo)致的問(wèn)題,一般檢測(cè)日志會(huì)有類(lèi)似“task serializable as xxx bytes”, xxx一般大于幾M , 內(nèi)存泄露本就是嚴(yán)重的問(wèn)題,這時(shí)首先需要解決的當(dāng)然是內(nèi)存泄露自身

命令行提交作業(yè),--drive-memory 盡可能設(shè)更大。并不意外,實(shí)踐表明,這一條往往成功解決了OOM 問(wèn)題

應(yīng)該說(shuō),Spark 解決了Hadoop 的很多弊端,相比于Hadoop它取得了很多長(zhǎng)足的改進(jìn)。首先是速度上,Spark 運(yùn)行效率提升了幾十甚至百倍,相對(duì)于Hadoop ,Spark是近實(shí)時(shí)的。

隨著版本迭代,現(xiàn)在Spark 系統(tǒng)也越來(lái)越穩(wěn)定了。

本人曾基于RDD 編程,實(shí)現(xiàn)過(guò)股票基金投資組合的最優(yōu)化系統(tǒng),主要是用它完成了帶復(fù)雜約束條件的完整的遺傳算法過(guò)程,測(cè)試下來(lái),相比于量化研究員們慣用的R加SQLServer 的傳統(tǒng)做法求解最優(yōu)化,RDD 方法不論運(yùn)行速度還是最優(yōu)化結(jié)果都獲得了很大的提升,運(yùn)行時(shí)間從一兩個(gè)小時(shí)縮短到了兩分鐘,最優(yōu)解提升了百分之二十。當(dāng)然,也曾考慮過(guò)Java單機(jī)應(yīng)用程序是否可能效率就足夠好甚至更高,不過(guò),排除多個(gè)請(qǐng)求作業(yè)并行運(yùn)行需求之外,當(dāng)遺傳算法初始種群數(shù)量比較大,染色體數(shù)量比較多,約束條件很復(fù)雜,所需繁衍后代次數(shù)非常多的時(shí)候,應(yīng)該可以判定Spark 平臺(tái)下的RDD 應(yīng)用系統(tǒng)仍然會(huì)有明顯的優(yōu)勢(shì)。

Spark ML 機(jī)器學(xué)習(xí)算法庫(kù),經(jīng)過(guò)實(shí)戰(zhàn),表現(xiàn)還是很不錯(cuò)的。做數(shù)據(jù)挖掘項(xiàng)目,有時(shí)可能訓(xùn)練樣本數(shù)量特別多,也可能需要同時(shí)運(yùn)行比較幾百個(gè)模型,這在單機(jī)下要跑很久甚至跑不出來(lái),但是,應(yīng)用Spark ML 庫(kù)可在合理的時(shí)間內(nèi)運(yùn)行完成。

HBase 的速度相對(duì)比較快,它解決了傳統(tǒng)單機(jī)數(shù)據(jù)庫(kù)如Oracle 可擴(kuò)展性差的問(wèn)題。但是HBase的實(shí)時(shí)性只是相對(duì)于map-reduce 的低速來(lái)講的,特別是當(dāng)做大表的join 操作時(shí),HBase/Phoenix 的問(wèn)題表現(xiàn)的很明顯。有內(nèi)部測(cè)試表明,20張表作join 操作,其中大的表有幾億條記錄,小的表紀(jì)錄數(shù)也在百萬(wàn)級(jí),HBase 上幾乎跑不起來(lái),但是Hive on Spark 只要半個(gè)小時(shí)就運(yùn)行出結(jié)果了。事實(shí)上,Hive on Spark已經(jīng)在一些企業(yè)運(yùn)行起來(lái)了,并且反饋的結(jié)果都很不錯(cuò),在某些地方,Hive on Spark 也已經(jīng)逐步替代HBase/Phoenix 成為新的大數(shù)據(jù)平臺(tái)下的數(shù)據(jù)倉(cāng)庫(kù)了。

可以預(yù)見(jiàn),在不久的將來(lái),Spark 終會(huì)在大數(shù)據(jù)平臺(tái)里扮演遠(yuǎn)比Hadoop 更為重要的角色。

本文作者:朱志亮(點(diǎn)融黑幫),復(fù)旦大學(xué)計(jì)算機(jī)系研究生畢業(yè),有三年多傳統(tǒng)IT經(jīng)驗(yàn)和一年多互聯(lián)網(wǎng)大數(shù)據(jù)經(jīng)驗(yàn),對(duì)大數(shù)據(jù)平臺(tái)和數(shù)據(jù)挖掘興趣很大,目前主要專(zhuān)注于大數(shù)據(jù)平臺(tái)下的數(shù)據(jù)挖掘。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容