Spark會(huì)把數(shù)據(jù)都載入到內(nèi)存么?

這篇文章算是個(gè)科普貼。如果已經(jīng)熟悉Spark的就略過(guò)吧。

前言

很多初學(xué)者其實(shí)對(duì)Spark的編程模式還是RDD這個(gè)概念理解不到位,就會(huì)產(chǎn)生一些誤解。

比如,很多時(shí)候我們常常以為一個(gè)文件是會(huì)被完整讀入到內(nèi)存,然后做各種變換,這很可能是受兩個(gè)概念的誤導(dǎo):

  1. RDD的定義,RDD是一個(gè)分布式的不可變數(shù)據(jù)集合
  2. Spark 是一個(gè)內(nèi)存處理引擎

如果你沒(méi)有主動(dòng)對(duì)RDDCache/Persist,它不過(guò)是一個(gè)概念上存在的虛擬數(shù)據(jù)集,你實(shí)際上是看不到這個(gè)RDD的數(shù)據(jù)的全集的(他不會(huì)真的都放到內(nèi)存里)。

RDD的本質(zhì)是什么

一個(gè)RDD 本質(zhì)上是一個(gè)函數(shù),而RDD的變換不過(guò)是函數(shù)的嵌套。RDD我認(rèn)為有兩類(lèi):

  1. 輸入RDD,典型如KafkaRDD,JdbcRDD
  2. 轉(zhuǎn)換RDD,如MapPartitionsRDD

我們以下面的代碼為例做分析:

sc.textFile("abc.log").map().saveAsTextFile("")
  • textFile 會(huì)構(gòu)建出一個(gè)NewHadoopRDD,
  • map函數(shù)運(yùn)行后會(huì)構(gòu)建出一個(gè)MapPartitionsRDD
  • saveAsTextFile觸發(fā)了實(shí)際流程代碼的執(zhí)行

所以RDD不過(guò)是對(duì)一個(gè)函數(shù)的封裝,當(dāng)一個(gè)函數(shù)對(duì)數(shù)據(jù)處理完成后,我們就得到一個(gè)RDD的數(shù)據(jù)集(是一個(gè)虛擬的,后續(xù)會(huì)解釋)。

NewHadoopRDD是數(shù)據(jù)來(lái)源,每個(gè)parition負(fù)責(zé)獲取數(shù)據(jù),獲得過(guò)程是通過(guò)iterator.next 獲得一條一條記錄的。假設(shè)某個(gè)時(shí)刻拿到了一條數(shù)據(jù)A,這個(gè)A會(huì)立刻被map里的函數(shù)處理得到B(完成了轉(zhuǎn)換),然后開(kāi)始寫(xiě)入到HDFS上。其他數(shù)據(jù)重復(fù)如此。所以整個(gè)過(guò)程:

  • 理論上某個(gè)MapPartitionsRDD里實(shí)際在內(nèi)存里的數(shù)據(jù)等于其Partition的數(shù)目,是個(gè)非常小的數(shù)值。
  • NewHadoopRDD則會(huì)略多些,因?yàn)閷儆跀?shù)據(jù)源,讀取文件,假設(shè)讀取文件的buffer是1M,那么最多也就是partitionNum*1M 數(shù)據(jù)在內(nèi)存里
  • saveAsTextFile也是一樣的,往HDFS寫(xiě)文件,需要buffer,最多數(shù)據(jù)量為 buffer* partitionNum

所以整個(gè)過(guò)程其實(shí)是流式的過(guò)程,一條數(shù)據(jù)被各個(gè)RDD所包裹的函數(shù)處理。

剛才我反復(fù)提到了嵌套函數(shù),怎么知道它是嵌套的呢?

如果你寫(xiě)了這樣一個(gè)代碼:

sc.textFile("abc.log").map().map().........map().saveAsTextFile("")

有成千上萬(wàn)個(gè)map,很可能就堆棧溢出了。為啥?實(shí)際上是函數(shù)嵌套太深了。

按上面的邏輯,內(nèi)存使用其實(shí)是非常小的,10G內(nèi)存跑100T數(shù)據(jù)也不是難事。但是為什么Spark常常因?yàn)閮?nèi)存問(wèn)題掛掉呢? 我們接著往下看。

Shuffle的本質(zhì)是什么?

這就是為什么要分Stage了。每個(gè)Stage其實(shí)就是我上面說(shuō)的那樣,一套數(shù)據(jù)被N個(gè)嵌套的函數(shù)處理(也就是你的transform動(dòng)作)。遇到了Shuffle,就被切開(kāi)來(lái),所謂的Shuffle,本質(zhì)上是把數(shù)據(jù)按規(guī)則臨時(shí)都落到磁盤(pán)上,相當(dāng)于完成了一個(gè)saveAsTextFile的動(dòng)作,不過(guò)是存本地磁盤(pán)。然后被切開(kāi)的下一個(gè)Stage則以本地磁盤(pán)的這些數(shù)據(jù)作為數(shù)據(jù)源,重新走上面描述的流程。

我們?cè)僮鲆淮蚊枋觯?/p>

所謂Shuffle不過(guò)是把處理流程切分,給切分的上一段(我們稱(chēng)為Stage M)加個(gè)存儲(chǔ)到磁盤(pán)的Action動(dòng)作,把切分的下一段(Stage M+1)數(shù)據(jù)源變成Stage M存儲(chǔ)的磁盤(pán)文件。每個(gè)Stage都可以走我上面的描述,讓每條數(shù)據(jù)都可以被N個(gè)嵌套的函數(shù)處理,最后通過(guò)用戶(hù)指定的動(dòng)作進(jìn)行存儲(chǔ)。

為什么Shuffle 容易導(dǎo)致Spark掛掉

前面我們提到,Shuffle不過(guò)是偷偷的幫你加上了個(gè)類(lèi)似saveAsLocalDiskFile的動(dòng)作。然而,寫(xiě)磁盤(pán)是一個(gè)高昂的動(dòng)作。所以我們盡可能的把數(shù)據(jù)先放到內(nèi)存,再批量寫(xiě)到文件里,還有讀磁盤(pán)文件也是給費(fèi)內(nèi)存的動(dòng)作。把數(shù)據(jù)放內(nèi)存,就遇到個(gè)問(wèn)題,比如10000條數(shù)據(jù),到底會(huì)占用多少內(nèi)存?這個(gè)其實(shí)很難預(yù)估的。所以一不小心,就容易導(dǎo)致內(nèi)存溢出了。這其實(shí)也是一個(gè)很無(wú)奈的事情。

我們做Cache/Persist意味著什么?

其實(shí)就是給某個(gè)Stage加上了一個(gè)saveAsMemoryBlockFile的動(dòng)作,然后下次再要數(shù)據(jù)的時(shí)候,就不用算了。這些存在內(nèi)存的數(shù)據(jù)就表示了某個(gè)RDD處理后的結(jié)果。這個(gè)才是說(shuō)為啥Spark是內(nèi)存計(jì)算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允許你把中間結(jié)果放內(nèi)存里。

總結(jié)

我們從一個(gè)較新的角度解釋了RDD 和Shuffle 都是一個(gè)什么樣的東西。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容