Spark Shuffle Read階段是如何讀取數(shù)據(jù)的
- Reduce去拉取Map的輸出數(shù)據(jù),Spark提供了兩套不同的拉取數(shù)據(jù)框架:通過socket連接去取數(shù)據(jù);使用netty框架去取數(shù)據(jù)。
- 每個節(jié)點的Executor會創(chuàng)建一個BlockManager,其中會創(chuàng)建一個BlockManagerWorker用于響應請求。當Reduce的GET_BLOCK的請求過來時,讀取本地文件將這個blockId的數(shù)據(jù)返回給Reduce。如果使用的是Netty框架,BlockManager會創(chuàng)建ShuffleSender用于發(fā)送Shuffle數(shù)據(jù)。
- 并不是所有的數(shù)據(jù)都是通過網(wǎng)絡(luò)讀取,對于在本節(jié)點的Map數(shù)據(jù),Reduce直接去磁盤上讀取而不再通過網(wǎng)絡(luò)框架。
Spark shuffle reducer的結(jié)果是如何存儲的?
- Reduce拖過來數(shù)據(jù)之后以什么方式存儲呢?Spark Map輸出的數(shù)據(jù)沒有經(jīng)過排序,Spark Shuffle過來的數(shù)據(jù)也不會進行排序,Spark認為Shuffle過程中的排序不是必須的,并不是所有類型的Reduce需要的數(shù)據(jù)都需要排序,強制地進行排序只會增加Shuffle的負擔。Reduce拖過來的數(shù)據(jù)會放在一個HashMap中,HashMap中存儲的也是<key, value>對,key是Map輸出的key,Map輸出對應這個key的所有value組成HashMap的value。Spark將Shuffle取過來的每一個<key, value>對插入或者更新到HashMap中,來一個處理一個。HashMap全部放在內(nèi)存中。
- Shuffle取過來的數(shù)據(jù)全部存放在內(nèi)存中,對于數(shù)據(jù)量比較小或者已經(jīng)在Map端做過合并處理的Shuffle數(shù)據(jù),占用內(nèi)存空間不會太大,但是對于比如group by key這樣的操作,Reduce需要得到key對應的所有value,并將這些value組一個數(shù)組放在內(nèi)存中,這樣當數(shù)據(jù)量較大時,就需要較多內(nèi)存。
- 當內(nèi)存不夠時,要不就失敗,要不就用老辦法把內(nèi)存中的數(shù)據(jù)移到磁盤上放著。Spark意識到在處理數(shù)據(jù)規(guī)模遠遠大于內(nèi)存空間時所帶來的不足,引入了一個具有外部排序的方案。Shuffle過來的數(shù)據(jù)先放在內(nèi)存中,當內(nèi)存中存儲的<key, value>對超過1000并且內(nèi)存使用超過70%時,判斷節(jié)點上可用內(nèi)存如果還足夠,則把內(nèi)存緩沖區(qū)大小翻倍,如果可用內(nèi)存不再夠了,則把內(nèi)存中的<key, value>對排序然后寫到磁盤文件中。最后把內(nèi)存緩沖區(qū)中的數(shù)據(jù)排序之后和那些磁盤文件組成一個最小堆,每次從最小堆中讀取最小的數(shù)據(jù),這個和MapReduce中的merge過程類似。
Spark中不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
- 當數(shù)據(jù)規(guī)模小,Hash shuffle快于Sorted Shuffle數(shù)據(jù)規(guī)模大的時候;當數(shù)據(jù)量大,sorted Shuffle會比Hash shuffle快很多,因為數(shù)量大的有很多小文件,不均勻,甚至出現(xiàn)數(shù)據(jù)傾斜,消耗內(nèi)存大,1.x之前spark使用hash,適合處理中小規(guī)模,1.x之后,增加了Sorted shuffle,Spark更能勝任大規(guī)模處理了。
Spark中的HashShufle的有哪些不足?
1)shuffle產(chǎn)生海量的小文件在磁盤上,此時會產(chǎn)生大量耗時的、低效的IO操作;
2).容易導致內(nèi)存不夠用,由于內(nèi)存需要保存海量的文件操作句柄和臨時緩存信息,如果數(shù)據(jù)處理規(guī)模比較大的化,容易出現(xiàn)OOM;
3)容易出現(xiàn)數(shù)據(jù)傾斜,導致OOM。
Spark中Sort-based shuffle的缺陷?
- 如果mapper中task的數(shù)量過大,依舊會產(chǎn)生很多小文件,此時在shuffle傳遞數(shù)據(jù)的過程中reducer段,reduce會需要同時大量的記錄進行反序列化,導致大量的內(nèi)存消耗和GC的巨大負擔,造成系統(tǒng)緩慢甚至崩潰;
- 2)如果需要在分片內(nèi)也進行排序,此時需要進行mapper段和reducer段的兩次排序。