主要想回答兩個(gè)問(wèn)題:
- map端(shuffle-write)如何對(duì)數(shù)據(jù)進(jìn)行分片?
- reduce端(shuffle-read)如何讀取數(shù)據(jù)?
ShuffleMapTask中,指定此task運(yùn)算真對(duì)上游RDD的那個(gè)partition,即map端的partition,writer.write操作的時(shí)候,根據(jù)RDD的partitioner生成新的partitionId,然后寫(xiě)入,完成shuffle-write,下游shuffle-read的時(shí)候,拉取相應(yīng)得partition數(shù)據(jù)即可;
下面插入一段說(shuō)一下Spark中netty block server的實(shí)現(xiàn):
- NettyRpcEnv :: TransportContext-> createServer -> new TransportServer
- TransportServer中appRpcHandler就是上層處理邏輯,默認(rèn)沒(méi)有安全配置的情況下,bootstraps集合為空;
- TransportServer -> init 初始化bootstrap,其中childHandler定義了對(duì)請(qǐng)求的處理邏輯,即context.initializePipeline(ch, rpcHandler);
- TransportContext :: initializePipeLine 定義了處理請(qǐng)求的pipeline,pipeline中包括對(duì)req,rap的encoder,decoder,TransportChannelHandler;
- TransportChannelHandler :: channelRead0 根據(jù)message的不同,分別調(diào)用requestHandler和responseHandler進(jìn)行處理,上層的RpcHandler就包含在RequestHandler中;
當(dāng)reduce端讀取數(shù)據(jù)的時(shí)候,ShuffleBlockFetcherIterator :: sendRequest 調(diào)用 NettyBlockTransferService :: fetchBlocks 調(diào)用OneForOneBlockFetcher::start 首先調(diào)用TransportClient :: sendRpcSync 發(fā)送OpenBlocks發(fā)送到上面提到的netty block server,然后發(fā)送ChunkFetchRequest,獲取對(duì)應(yīng)的chunk,這里面的chunk其實(shí)就是一個(gè)一個(gè)的block,一個(gè)(shuffleId, mapId, bucketId(reduceId))唯一確定一個(gè)block,也即下游RDD的一個(gè)partition;
shuffle-read其實(shí)是從上游executor以block為單位獲取數(shù)據(jù),這里就遇到了一個(gè)問(wèn)題,如果數(shù)據(jù)分布不均勻,導(dǎo)致下游某個(gè)partition過(guò)大,即這個(gè)block過(guò)大,就會(huì)出現(xiàn)OOM,Netty會(huì)報(bào)錯(cuò)direct buffer out of memory;
上面說(shuō)的OOM是Netty處理數(shù)據(jù)時(shí)堆外內(nèi)存的OOM,如果限制使用堆外內(nèi)存(為Executor增加配置-Dio.netty.noUnsafe=true,就可以讓shuffle不使用堆外內(nèi)存),會(huì)報(bào)堆內(nèi)內(nèi)存OOM,java.lang.OutOfMemoryError: Java heap space;
如何解決?
其實(shí)在對(duì)Block處理過(guò)程中,無(wú)論是Client端還是Server端,都是以ManagedBuffer來(lái)處理的,具體實(shí)現(xiàn)類有FileSegmentManagedBuffer,NettyManagedBuffer等,Server端收到請(qǐng)求之后,會(huì)將返回的Block封裝在FileSegmentmanagedBuffer,這個(gè)類內(nèi)部不cache數(shù)據(jù),提供從文件中讀取block data的方法,但是過(guò)rpc server時(shí)通過(guò)encoder會(huì)進(jìn)行封裝,從FIleChannel零拷貝寫(xiě)入SocketChannel,具體實(shí)現(xiàn)就是在MessageEncoder里面將FileSegmentBuffer converToNetty,其實(shí)生成時(shí)FileRegion,后面封裝到MessageWithHeader也是FileRegion,寫(xiě)出到List<Object> out,Netty會(huì)調(diào)用FileRegion中的transferTo,將內(nèi)容寫(xiě)到目標(biāo)channel,寫(xiě)入是直接調(diào)用file.transfer,實(shí)現(xiàn)零拷貝;
所以是否可以嘗試添加一個(gè)新的協(xié)議,在OneForOneBlockFetcher中,判斷,如果一個(gè)block小于某值,比如100M,使用原來(lái)的方式fetch數(shù)據(jù),否則,服務(wù)端收到請(qǐng)求之后返回?cái)?shù)據(jù)流,客戶端收到數(shù)據(jù)流之后,將數(shù)據(jù)寫(xiě)到本地文件,形成新的FileSegmentManagedBuffer,供后續(xù)處理,對(duì)比原來(lái)的實(shí)現(xiàn),就是將客戶端直接處理NettyManagedBuffer變成直接處理FileSegmentManagedBuffer;