[SPARK][CORE] 面試問題之什么是 external shuffle service?

歡迎關(guān)注公眾號“Tim在路上”
在討論external shuffle service的具體實現(xiàn)之前,我們先來回顧下spark shuffle的大概過程。

spark shuffle分為兩部分shuffle write和shuffle read。

在map write端,對每個task的數(shù)據(jù),不管是按key hash還是在數(shù)據(jù)結(jié)構(gòu)里先聚合再排序,最終都會將數(shù)據(jù)寫到一個partitionFile里面,在partitionFile里面的數(shù)據(jù)是partitionId有序的,外加會生成一個索引文件,索引包含每個partition對應(yīng)偏移量和長度。

而reduce read 端就是從這些partitionFile里面拉取相應(yīng)partitionId的數(shù)據(jù), 然后再進行聚合排序。

現(xiàn)在我們在來看下****external shuffle service(ESS)****,其乍從其名字上看,ESS是spark分布式集群為存儲shuffle data而設(shè)計的分布式組件。但其實它只是Spark通過Executor獲取Shuffle data塊的代理。

我們可以理解為ESS負責(zé)管理shuffle write端生成的shuffle數(shù)據(jù),ESS是和yarn一起使用的, 在yarn集群上的每一個nodemanager上面都運行一個ESS,是一個常駐進程。一個ESS管理每個nodemanager上所有的executor生成的shuffle數(shù)據(jù)??偠灾?,ESS并不是分布式的組件,它的生命周期也不依賴于Executor。

為什么需要ESS ?

在Spark中,Executor進程除了運行task,還要負責(zé)寫shuffle 數(shù)據(jù),以及給其他Executor提供shuffle數(shù)據(jù)。當(dāng)Executor進程任務(wù)過重,導(dǎo)致GC而不能為其他Executor提供shuffle數(shù)據(jù)時,會影響任務(wù)運行。同時,ESS的存在也使得,即使executor掛掉或者回收,都不影響其shuffle數(shù)據(jù),因此只有在ESS開啟情況下才能開啟動態(tài)調(diào)整executor數(shù)目。

因此,spark提供了external shuffle service這個接口,常見的就是spark on yarn中的,YarnShuffleService。這樣,在yarn的nodemanager中會常駐一個externalShuffleService服務(wù)進程來為所有的executor服務(wù),默認為7337端口。

其實在spark中shuffleClient有兩種,一種是blockTransferService,另一種是externalShuffleClient。如果在ESS開啟,那么externalShuffleClient用來fetch shuffle數(shù)據(jù),而blockTransferService用于獲取broadCast等其他BlockManager保存的數(shù)據(jù)。

如果ESS沒有開啟,那么spark就只能使用自己的blockTransferService來拉取所有數(shù)據(jù),包括shuffle數(shù)據(jù)以及broadcast數(shù)據(jù)。

ESS的架構(gòu)與優(yōu)勢

在啟用ESS后,ESS服務(wù)會在node節(jié)點上創(chuàng)建,并且每次存在時,新創(chuàng)建的Executor都會向其注冊。

/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
    String appId,
    String execId,
    ExecutorShuffleInfo executorInfo) {

在注冊過程中,使用appId, execId和ExecutorShuffleInfo(localDirs, shuffleManager類型)作為參數(shù),從參數(shù)信息可以看出Executor會通知ESS服務(wù)它創(chuàng)建在磁盤上文件的存儲位置。由于這些信息,ESS服務(wù)守護進程能夠在檢索過程中將shuffle中間的臨時文件返回給其他執(zhí)行程序。

ESS服務(wù)的存在也會影響文件刪除。在正常情況下(沒有外部 shuffle 服務(wù)),當(dāng)Executor停止時,它會自動刪除生成的文件。但是啟用ESS服務(wù)后,Executor關(guān)閉后文件不會被清理。以下架構(gòu)圖說明了啟用外部 shuffle 服務(wù)時工作程序節(jié)點上發(fā)生的情況:

ed.png

ESS服務(wù)的一大優(yōu)勢是提高了可靠性。即使其中一個 executor 出現(xiàn)故障,它的 shuffle 文件也不會丟失。另一個優(yōu)點是可擴展性,因為在 Spark 中運行動態(tài)資源分配需要ESS服務(wù),這塊我們后續(xù)在進行介紹。

總之使用Spark ESS 為 Spark Shuffle 操作帶來了以下好處:

  1. 即使 Spark Executor 正在經(jīng)歷 GC 停頓,Spark ESS 也可以為 Shuffle 塊提供服務(wù)。
  2. 即使產(chǎn)生它們的 Spark Executor 掛了,Shuffle 塊也能提供服務(wù)。
  3. 可以釋放閑置的 Spark Executor 來節(jié)省集群的計算資源。

ESS源碼初探

Executors 通過 RPC 協(xié)議與ESS服務(wù)通信,發(fā)送兩種類型的消息:RegisterExecutorOpenBlocks。當(dāng)Executor想要在其local external shuffle service中注冊時,使用RegisterExecutor, OpenBlocks在獲取shuffle data過程中使用。

在Executor創(chuàng)建的時候,會調(diào)用env.blockManager.initialize(conf.getAppId),在blockManager存儲當(dāng)前node的externalBlockStoreClient ,在其initialize方法中執(zhí)行blockStoreClient.init(appId),這里的blockStoreClient稱為shuffleClient(這里是ExternalShuffleClient)。在shuffleClient 只是將一些實用對象設(shè)置為工廠來創(chuàng)建遠程連接。

稍后 BlockManager 調(diào)用registerWithShuffleServer方法,這時ESS shuffle 服務(wù)會知道executor 存儲 shuffle 文件的位置。

// blockManager類
// [1] executor 向ESS注冊
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled&& !blockManagerId.isDriver) {
  registerWithExternalShuffleServer()
}

   // [2] 封裝localDirs, shuffle data的位置信息
    val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirsString,
      diskBlockManager.subDirsPerLocalDir,
      shuffleManagerMeta)

 // [3] 向ESS發(fā)送RegisterExecutor消息
    try (TransportClient client = clientFactory.createClient(host, port)) {
      ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
      // 
      client.sendRpcSync(registerMessage, registrationTimeoutMs);
    }

最終會將其存放在ESS維護的executors列表中,它是以下數(shù)據(jù)結(jié)構(gòu)ConcurrentMap<AppExecId, ExecutorShuffleInfo> 。

接下來我們來分析下,reducer如何通過ESS來獲取shuffle數(shù)據(jù)塊。

獲取shuffle block的請求在ExternalShuffleClient的fetchBlocks方法中生成。獲取的過程使用RetryingBlockFetcher實例,它可以在失敗時重試獲取塊。實際上,獲取過程最終是由OneForOneBlockFetcher類實現(xiàn)的,它負責(zé)發(fā)送請求以檢索所需的塊。

RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
    (inputBlockId, inputListener) -> {
      // Unless this client is closed.
      if (clientFactory != null) {
        assert inputListener instanceof BlockFetchingListener :
          "Expecting a BlockFetchingListener, but got " + inputListener.getClass();
        TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
        new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
          (BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
      } else {
        logger.info("This clientFactory was closed. Skipping further block fetch retries.");
      }
    };

可以看到這里的代碼和我們在shuffle reader中講解的是一致的。

  1. 首先,請求獲取(block id, chunks 數(shù))組成的鍵值對。
  2. 其次,請求獲取chunks 塊的具體內(nèi)容。

下面我們再來總結(jié)下chunks塊獲取的詳細流程:


U2led.png

chunks塊的獲取有兩種模式,分別是流模式或批處理模式。

流模式操作是通過 TransportClient 的stream方法實現(xiàn)的。它包括向TransportRequestHandler的實例發(fā)送 StreamRequest 消息。處理程序通知客戶端打開用于發(fā)送所需數(shù)據(jù)的 TCP 連接,然后傳輸發(fā)生在整個連接中,在單個 TCP 連接中向客戶端發(fā)送所需的數(shù)據(jù)。

批處理模式操作是使用 TransportClient 的fetchChunk方法實現(xiàn)的。該請求方法包含要獲取的block的索引。處理程序只向客戶端返回這個特定的數(shù)據(jù)塊,所以它是每個請求響應(yīng)一個塊。

ESS的配置與使用

ESS shuffle 服務(wù)的配置以spark.shuffle.service前綴開頭:

  • spark.shuffle.service.enabled - 定義ESS服務(wù)是否啟用。
  • spark.shuffle.service.port - 定義運行ESS shuffle 服務(wù)的端口。由于該服務(wù)應(yīng)該與執(zhí)行程序在同一節(jié)點上運行,因此配置中不存在主機。
  • spark.shuffle.service.index.cache.size - 確定緩存的大小。在開啟ESS shuffle 服務(wù)情況下,用于緩存存儲索引文件信息。它避免了每次獲取塊時打開/關(guān)閉這些文件。主要用于基于排序的 shuffle 數(shù)據(jù)。

學(xué)完External Shuffle Service,下面是一些思考題:

  1. External Shuffle Service的優(yōu)勢是什么?shuffle data是否被存儲在ESS中?
  2. 為什么在Spark動態(tài)資源分配時需要ESS服務(wù)?
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容