歡迎關(guān)注公眾號“Tim在路上”
在討論external shuffle service的具體實現(xiàn)之前,我們先來回顧下spark shuffle的大概過程。
spark shuffle分為兩部分shuffle write和shuffle read。
在map write端,對每個task的數(shù)據(jù),不管是按key hash還是在數(shù)據(jù)結(jié)構(gòu)里先聚合再排序,最終都會將數(shù)據(jù)寫到一個partitionFile里面,在partitionFile里面的數(shù)據(jù)是partitionId有序的,外加會生成一個索引文件,索引包含每個partition對應(yīng)偏移量和長度。
而reduce read 端就是從這些partitionFile里面拉取相應(yīng)partitionId的數(shù)據(jù), 然后再進行聚合排序。
現(xiàn)在我們在來看下****external shuffle service(ESS)****,其乍從其名字上看,ESS是spark分布式集群為存儲shuffle data而設(shè)計的分布式組件。但其實它只是Spark通過Executor獲取Shuffle data塊的代理。
我們可以理解為ESS負責(zé)管理shuffle write端生成的shuffle數(shù)據(jù),ESS是和yarn一起使用的, 在yarn集群上的每一個nodemanager上面都運行一個ESS,是一個常駐進程。一個ESS管理每個nodemanager上所有的executor生成的shuffle數(shù)據(jù)??偠灾?,ESS并不是分布式的組件,它的生命周期也不依賴于Executor。
為什么需要ESS ?
在Spark中,Executor進程除了運行task,還要負責(zé)寫shuffle 數(shù)據(jù),以及給其他Executor提供shuffle數(shù)據(jù)。當(dāng)Executor進程任務(wù)過重,導(dǎo)致GC而不能為其他Executor提供shuffle數(shù)據(jù)時,會影響任務(wù)運行。同時,ESS的存在也使得,即使executor掛掉或者回收,都不影響其shuffle數(shù)據(jù),因此只有在ESS開啟情況下才能開啟動態(tài)調(diào)整executor數(shù)目。
因此,spark提供了external shuffle service這個接口,常見的就是spark on yarn中的,YarnShuffleService。這樣,在yarn的nodemanager中會常駐一個externalShuffleService服務(wù)進程來為所有的executor服務(wù),默認為7337端口。
其實在spark中shuffleClient有兩種,一種是blockTransferService,另一種是externalShuffleClient。如果在ESS開啟,那么externalShuffleClient用來fetch shuffle數(shù)據(jù),而blockTransferService用于獲取broadCast等其他BlockManager保存的數(shù)據(jù)。
如果ESS沒有開啟,那么spark就只能使用自己的blockTransferService來拉取所有數(shù)據(jù),包括shuffle數(shù)據(jù)以及broadcast數(shù)據(jù)。
ESS的架構(gòu)與優(yōu)勢
在啟用ESS后,ESS服務(wù)會在node節(jié)點上創(chuàng)建,并且每次存在時,新創(chuàng)建的Executor都會向其注冊。
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo) {
在注冊過程中,使用appId, execId和ExecutorShuffleInfo(localDirs, shuffleManager類型)作為參數(shù),從參數(shù)信息可以看出Executor會通知ESS服務(wù)它創(chuàng)建在磁盤上文件的存儲位置。由于這些信息,ESS服務(wù)守護進程能夠在檢索過程中將shuffle中間的臨時文件返回給其他執(zhí)行程序。
ESS服務(wù)的存在也會影響文件刪除。在正常情況下(沒有外部 shuffle 服務(wù)),當(dāng)Executor停止時,它會自動刪除生成的文件。但是啟用ESS服務(wù)后,Executor關(guān)閉后文件不會被清理。以下架構(gòu)圖說明了啟用外部 shuffle 服務(wù)時工作程序節(jié)點上發(fā)生的情況:

ESS服務(wù)的一大優(yōu)勢是提高了可靠性。即使其中一個 executor 出現(xiàn)故障,它的 shuffle 文件也不會丟失。另一個優(yōu)點是可擴展性,因為在 Spark 中運行動態(tài)資源分配需要ESS服務(wù),這塊我們后續(xù)在進行介紹。
總之使用Spark ESS 為 Spark Shuffle 操作帶來了以下好處:
- 即使 Spark Executor 正在經(jīng)歷 GC 停頓,Spark ESS 也可以為 Shuffle 塊提供服務(wù)。
- 即使產(chǎn)生它們的 Spark Executor 掛了,Shuffle 塊也能提供服務(wù)。
- 可以釋放閑置的 Spark Executor 來節(jié)省集群的計算資源。
ESS源碼初探
Executors 通過 RPC 協(xié)議與ESS服務(wù)通信,發(fā)送兩種類型的消息:RegisterExecutor和OpenBlocks。當(dāng)Executor想要在其local external shuffle service中注冊時,使用RegisterExecutor, OpenBlocks在獲取shuffle data過程中使用。
在Executor創(chuàng)建的時候,會調(diào)用env.blockManager.initialize(conf.getAppId),在blockManager存儲當(dāng)前node的externalBlockStoreClient ,在其initialize方法中執(zhí)行blockStoreClient.init(appId),這里的blockStoreClient稱為shuffleClient(這里是ExternalShuffleClient)。在shuffleClient 只是將一些實用對象設(shè)置為工廠來創(chuàng)建遠程連接。
稍后 BlockManager 調(diào)用registerWithShuffleServer方法,這時ESS shuffle 服務(wù)會知道executor 存儲 shuffle 文件的位置。
// blockManager類
// [1] executor 向ESS注冊
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled&& !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
// [2] 封裝localDirs, shuffle data的位置信息
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirsString,
diskBlockManager.subDirsPerLocalDir,
shuffleManagerMeta)
// [3] 向ESS發(fā)送RegisterExecutor消息
try (TransportClient client = clientFactory.createClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
//
client.sendRpcSync(registerMessage, registrationTimeoutMs);
}
最終會將其存放在ESS維護的executors列表中,它是以下數(shù)據(jù)結(jié)構(gòu)ConcurrentMap<AppExecId, ExecutorShuffleInfo> 。
接下來我們來分析下,reducer如何通過ESS來獲取shuffle數(shù)據(jù)塊。
獲取shuffle block的請求在ExternalShuffleClient的fetchBlocks方法中生成。獲取的過程使用RetryingBlockFetcher實例,它可以在失敗時重試獲取塊。實際上,獲取過程最終是由OneForOneBlockFetcher類實現(xiàn)的,它負責(zé)發(fā)送請求以檢索所需的塊。
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
if (clientFactory != null) {
assert inputListener instanceof BlockFetchingListener :
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
(BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
};
可以看到這里的代碼和我們在shuffle reader中講解的是一致的。
- 首先,請求獲取(block id, chunks 數(shù))組成的鍵值對。
- 其次,請求獲取chunks 塊的具體內(nèi)容。
下面我們再來總結(jié)下chunks塊獲取的詳細流程:

chunks塊的獲取有兩種模式,分別是流模式或批處理模式。
流模式操作是通過 TransportClient 的stream方法實現(xiàn)的。它包括向TransportRequestHandler的實例發(fā)送 StreamRequest 消息。處理程序通知客戶端打開用于發(fā)送所需數(shù)據(jù)的 TCP 連接,然后傳輸發(fā)生在整個連接中,在單個 TCP 連接中向客戶端發(fā)送所需的數(shù)據(jù)。
批處理模式操作是使用 TransportClient 的fetchChunk方法實現(xiàn)的。該請求方法包含要獲取的block的索引。處理程序只向客戶端返回這個特定的數(shù)據(jù)塊,所以它是每個請求響應(yīng)一個塊。
ESS的配置與使用
ESS shuffle 服務(wù)的配置以spark.shuffle.service前綴開頭:
- spark.shuffle.service.enabled - 定義ESS服務(wù)是否啟用。
- spark.shuffle.service.port - 定義運行ESS shuffle 服務(wù)的端口。由于該服務(wù)應(yīng)該與執(zhí)行程序在同一節(jié)點上運行,因此配置中不存在主機。
- spark.shuffle.service.index.cache.size - 確定緩存的大小。在開啟ESS shuffle 服務(wù)情況下,用于緩存存儲索引文件信息。它避免了每次獲取塊時打開/關(guān)閉這些文件。主要用于基于排序的 shuffle 數(shù)據(jù)。
學(xué)完External Shuffle Service,下面是一些思考題:
- External Shuffle Service的優(yōu)勢是什么?shuffle data是否被存儲在ESS中?
- 為什么在Spark動態(tài)資源分配時需要ESS服務(wù)?