Spark shuffle-write 和 shuffle-read 中對(duì)數(shù)據(jù)傾斜情況的處理

主要想回答兩個(gè)問(wèn)題:

  1. map端(shuffle-write)如何對(duì)數(shù)據(jù)進(jìn)行分片?
  2. reduce端(shuffle-read)如何讀取數(shù)據(jù)?

ShuffleMapTask中,指定此task運(yùn)算真對(duì)上游RDD的那個(gè)partition,即map端的partition,writer.write操作的時(shí)候,根據(jù)RDD的partitioner生成新的partitionId,然后寫(xiě)入,完成shuffle-write,下游shuffle-read的時(shí)候,拉取相應(yīng)得partition數(shù)據(jù)即可;

下面插入一段說(shuō)一下Spark中netty block server的實(shí)現(xiàn):

  1. NettyRpcEnv :: TransportContext-> createServer -> new TransportServer
  2. TransportServer中appRpcHandler就是上層處理邏輯,默認(rèn)沒(méi)有安全配置的情況下,bootstraps集合為空;
  3. TransportServer -> init 初始化bootstrap,其中childHandler定義了對(duì)請(qǐng)求的處理邏輯,即context.initializePipeline(ch, rpcHandler);
  4. TransportContext :: initializePipeLine 定義了處理請(qǐng)求的pipeline,pipeline中包括對(duì)req,rap的encoder,decoder,TransportChannelHandler;
  5. TransportChannelHandler :: channelRead0 根據(jù)message的不同,分別調(diào)用requestHandler和responseHandler進(jìn)行處理,上層的RpcHandler就包含在RequestHandler中;

當(dāng)reduce端讀取數(shù)據(jù)的時(shí)候,ShuffleBlockFetcherIterator :: sendRequest 調(diào)用 NettyBlockTransferService :: fetchBlocks 調(diào)用OneForOneBlockFetcher::start 首先調(diào)用TransportClient :: sendRpcSync 發(fā)送OpenBlocks發(fā)送到上面提到的netty block server,然后發(fā)送ChunkFetchRequest,獲取對(duì)應(yīng)的chunk,這里面的chunk其實(shí)就是一個(gè)一個(gè)的block,一個(gè)(shuffleId, mapId, bucketId(reduceId))唯一確定一個(gè)block,也即下游RDD的一個(gè)partition;

shuffle-read其實(shí)是從上游executor以block為單位獲取數(shù)據(jù),這里就遇到了一個(gè)問(wèn)題,如果數(shù)據(jù)分布不均勻,導(dǎo)致下游某個(gè)partition過(guò)大,即這個(gè)block過(guò)大,就會(huì)出現(xiàn)OOM,Netty會(huì)報(bào)錯(cuò)direct buffer out of memory;
上面說(shuō)的OOM是Netty處理數(shù)據(jù)時(shí)堆外內(nèi)存的OOM,如果限制使用堆外內(nèi)存(為Executor增加配置-Dio.netty.noUnsafe=true,就可以讓shuffle不使用堆外內(nèi)存),會(huì)報(bào)堆內(nèi)內(nèi)存OOM,java.lang.OutOfMemoryError: Java heap space;

如何解決?
其實(shí)在對(duì)Block處理過(guò)程中,無(wú)論是Client端還是Server端,都是以ManagedBuffer來(lái)處理的,具體實(shí)現(xiàn)類有FileSegmentManagedBuffer,NettyManagedBuffer等,Server端收到請(qǐng)求之后,會(huì)將返回的Block封裝在FileSegmentmanagedBuffer,這個(gè)類內(nèi)部不cache數(shù)據(jù),提供從文件中讀取block data的方法,但是過(guò)rpc server時(shí)通過(guò)encoder會(huì)進(jìn)行封裝,從FIleChannel零拷貝寫(xiě)入SocketChannel,具體實(shí)現(xiàn)就是在MessageEncoder里面將FileSegmentBuffer converToNetty,其實(shí)生成時(shí)FileRegion,后面封裝到MessageWithHeader也是FileRegion,寫(xiě)出到List<Object> out,Netty會(huì)調(diào)用FileRegion中的transferTo,將內(nèi)容寫(xiě)到目標(biāo)channel,寫(xiě)入是直接調(diào)用file.transfer,實(shí)現(xiàn)零拷貝;
所以是否可以嘗試添加一個(gè)新的協(xié)議,在OneForOneBlockFetcher中,判斷,如果一個(gè)block小于某值,比如100M,使用原來(lái)的方式fetch數(shù)據(jù),否則,服務(wù)端收到請(qǐng)求之后返回?cái)?shù)據(jù)流,客戶端收到數(shù)據(jù)流之后,將數(shù)據(jù)寫(xiě)到本地文件,形成新的FileSegmentManagedBuffer,供后續(xù)處理,對(duì)比原來(lái)的實(shí)現(xiàn),就是將客戶端直接處理NettyManagedBuffer變成直接處理FileSegmentManagedBuffer;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容