Spark文檔 - 作業(yè)調(diào)度

概述

Spark提供了幾個可以在計算過程之間調(diào)度資源的工具。首先,每個Spark應(yīng)用程序(SparkContext實例)都運行在獨立的executor進程中,而集群管理器可以跨應(yīng)用程序調(diào)度資源。其次,Spark應(yīng)用程序內(nèi)部的眾多“作業(yè)”可能會并行執(zhí)行,這種場景在處理網(wǎng)絡(luò)請求時很常見。Spark包含了一個公平調(diào)度器在SparkContext內(nèi)部調(diào)度資源。

跨應(yīng)用程序調(diào)度

Spark應(yīng)用程序在集群上運行時都有獨立的executor進程負(fù)責(zé)執(zhí)行任務(wù)和存儲數(shù)據(jù),如果多個用戶想要共享集群,不同的集群管理器有不同的做法。

最簡單的方式是對資源靜態(tài)分區(qū),適用于所有集群管理器。這種方式會為應(yīng)用程序分配最大可用資源,應(yīng)用程序會在整個運行期間都持有并使用這些資源。這個方式通常用在Standalone,YARN和coarse-gained Mesos模式下。資源可以通過如下方式分配:

  • Standalone:該模式下,應(yīng)用程序以FIFO順序執(zhí)行,每個應(yīng)用程序都會嘗試使用所有的資源。可以在應(yīng)用程序內(nèi)部設(shè)置spark.cores.max來限制可用CPU數(shù),或者修改配置文件中的spark.deploy.defaultCores選項來更改默認(rèn)CPU數(shù)。除了控制CPU,應(yīng)用程序還可以通過spark.executor.memory控制內(nèi)存。
  • Mesos:在Mesos上使用靜態(tài)分區(qū)需要設(shè)置spark.mesos.coarsetrue。之后使用spark.cores.maxspark.executor.memory控制每個應(yīng)用程序可用的資源。
  • YARN--num-executors用于控制分配給每個應(yīng)用程序的executor數(shù)。--executor-memory--executor-cores用于控制每個executor可以使用的資源。

第二種方式是在Mesos模式下動態(tài)共享CPU。這種模式下,每個應(yīng)用程序依然擁有固定且獨立的內(nèi)存(使用spark.executor.memory設(shè)置),但是當(dāng)應(yīng)用程序不執(zhí)行任務(wù)時,其他應(yīng)用程序可能使用空閑的CPU執(zhí)行任務(wù)。這種模式適用于擁有大量非活躍應(yīng)用程序的場景,例如不同用戶使用的shell會話。當(dāng)然,它也可能帶來一定的延遲,因為在應(yīng)用程序之間協(xié)調(diào)CPU需要一點時間。使用mesos://并將spark.mesos.coarsefalse即可啟用該模式。

需要注意的是,Spark暫不支持內(nèi)存共享。

動態(tài)資源分配

Spark可以根據(jù)工作負(fù)載動態(tài)調(diào)整應(yīng)用程序使用的資源。這意味著應(yīng)用程序可以在不使用時將資源還給集群并在必要時重新請求資源。這一功能在多個應(yīng)用程序共享資源的時候很有用。

這一功能默認(rèn)未開啟。

配置和設(shè)置

開啟該功能需要兩步:首先設(shè)置spark.dynamicAllocation.enabledtrue,其次在集群中每個worker節(jié)點上設(shè)置一個外部洗牌服務(wù)并在應(yīng)用程序中配置spark.shuffle.service.enabledtrue。使用外部洗牌服務(wù)的目的是為了讓executor移除時不必刪除與之關(guān)聯(lián)的洗牌文件。以下是設(shè)置該服務(wù)的方法:

  • Standalone:設(shè)置spark.shuffle.service.enabledtrue。
  • Mesos:在所有worder節(jié)點上運行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh,還要設(shè)置spark.shuffle.service.enabledtrue
  • YARN:要在每個NodeManager上啟動外部洗牌服務(wù),需要以下步驟:
    1. 找到spark-<version>-yarn-shuffle.jar文件,一般可以在$SPARK_HOME/yarn文件夾下找到。
    2. 將jar包添加到所有NodeManager節(jié)點的類路徑中。
    3. 在每個節(jié)點的yarn-site.xml中,設(shè)置yarn.nodemanager.aux-services的值為spark_shuffle,之后設(shè)置yarn.nodemanager.aux-services.spark_shuffle.classorg.apache.spark.network.yarn.YarnShuffleService。
    4. 通過編輯etc/hadoop/yarn-env.sh文件中YARN_HEAPSIZE屬性,增加NodeManager的堆空間以避免洗牌期間的垃圾收集問題。
    5. 重啟集群中所有的NodeManager。

其他相關(guān)的配置項都在spark.dynamicAllocation.*spark.shuffle.service.*命名空間之下。

資源分配策略

從較高的層次來看,Spark應(yīng)當(dāng)在不使用時歸還executor,并在必要時獲取executor。由于沒有一個明確的方法來預(yù)測一個即將被移除的executor是否會在不久之后執(zhí)行任務(wù),或者即將添加的executor是否是空閑的。我們需要一點啟發(fā)式的方式來確定什么時候移除和請求executor。

請求策略

啟用了動態(tài)分配的應(yīng)用程序會在任務(wù)堆積時請求額外的executor。這個條件說明現(xiàn)有的executor無法有效的處理完提交的任務(wù)。

Spark按輪次請求executor。當(dāng)任務(wù)調(diào)度延遲時間超過spark.dynamicAllocation.schedulerBacklogTimeout秒時會觸發(fā)請求,之后每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒后,如果任務(wù)依然堆積會再次觸發(fā)請求。每隔輪次請求的executor數(shù)量按指數(shù)遞增。例如,應(yīng)用程序會在第一次請求新增1個executor,之后每個輪次會請求2,4,8個executor。

使用指數(shù)遞增策略的動機有兩方面。首先,應(yīng)用程序在開始請求executor時應(yīng)當(dāng)謹(jǐn)慎一點,因為可能少量executor就足夠了。其次,如果發(fā)現(xiàn)確實需要更多executor,應(yīng)用程序也可以及時的提高使用的資源。

移除策略

移除executor的策略相對簡單??臻e時間超過spark.dynamicAllocation.executorIdleTimeout的executor會被應(yīng)用程序移除。在多數(shù)情況下,這個條件和請求條件是互斥的。

優(yōu)雅的停用executor

不使用動態(tài)分配時,executor會在執(zhí)行失敗或者應(yīng)用程序退出時退出。這兩種情況下,與executor關(guān)聯(lián)的狀態(tài)都可以安全的廢棄。但是使用了動態(tài)分配后,應(yīng)用程序顯式移除某個executor后依然運行。這時如果應(yīng)用程序嘗試訪問與被移除executor相關(guān)聯(lián)(存儲或者寫入)的狀態(tài),Spark就不得不重新執(zhí)行該狀態(tài)。因此,Spark需要一種機制,通過在移除executor之前保留其狀態(tài)來優(yōu)雅的停用executor。

這種機制對洗牌來說很重要。在洗牌期間,executor首先將自己的映射數(shù)據(jù)輸出到本地磁盤,然后作為服務(wù)器供其它executor獲取這些輸出文件。如果某個事件掉隊了,即某個任務(wù)的執(zhí)行時間比同批次任務(wù)的時間更長,這時動態(tài)分配機制可能會在洗牌完成前就移除了executor,這樣與被移除executor關(guān)聯(lián)的洗牌文件就必須重新計算,這時不必要的。

保留洗牌文件的解決方案是使用外部洗牌服務(wù),該服務(wù)是一個運行在所有worker節(jié)點上的,獨立于應(yīng)用程序和executor的進程。如果啟用了該服務(wù),executor就會從該服務(wù)獲取洗牌文件。這樣executor的狀態(tài)就可以超越executor的生命周期繼續(xù)保留。

除了洗牌文件,executor還會在內(nèi)存或磁盤上緩存數(shù)據(jù)。executor移除后,這些緩存數(shù)據(jù)也無法訪問了。為了緩解這種情況,包含緩存數(shù)據(jù)的executor是不會被移除的??梢酝ㄟ^spark.dynamicAllocation.cachedExecutorIdleTimeout選項配置。

應(yīng)用程序內(nèi)調(diào)度

在應(yīng)用程序內(nèi)部,多個并行作業(yè)可以同時執(zhí)行。這里的“作業(yè)”指的是Spark action(例如savecollect)以及能推導(dǎo)出action的任務(wù)。Spark的調(diào)度器是線程安全的,允許應(yīng)用程序處理多個請求。

Spark調(diào)度器默認(rèn)以FIFO順序執(zhí)行作業(yè)。每個作業(yè)分割成多個“stage”(例如map和reduce階段)。第一個作業(yè)首先獲得使用資源的優(yōu)先級并執(zhí)行任務(wù),之后是第二個作業(yè),以此類推。如果隊列中第一個作業(yè)不需要使用整個集群,那么后續(xù)作業(yè)也可以立即執(zhí)行。但是如果作業(yè)很大,那么后續(xù)作業(yè)會明顯的被延遲。

可以為作業(yè)配置一個公平共享策略。這種策略下,Spark會以“循環(huán)”風(fēng)格為作業(yè)分配任務(wù),這樣左右的作業(yè)都可以得到大致相等的資源。這意味著小型作業(yè)不必等待大型作業(yè)執(zhí)行完畢也可以執(zhí)行,從而得到較好的響應(yīng)時間。

要啟用公平調(diào)度器,只需:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平調(diào)度器池

公平調(diào)度器可以將作業(yè)分組成,每個池可以設(shè)置不同的調(diào)度選項。這樣可以為更重要的作業(yè)創(chuàng)建一個“高優(yōu)先級”池,或者是將每個用戶的作業(yè)分組到一起,然后為用戶分配資源。

不使用任何配置,新提交的作業(yè)屬于默認(rèn)池。可以使用spark.scheduler.pool選項為作業(yè)分配池:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

設(shè)置完畢后,所有由該線程提交的作業(yè)都會使用這個池。設(shè)置是線程級別的,如果想移除池,只需要:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默認(rèn)行為

每個池默認(rèn)平分集群資源,但在池內(nèi)部,作業(yè)按照FIFO順序執(zhí)行。

配置池屬性

池屬性可以通過配置文件修改,池支持三個屬性:

  • schedulingMode:可以是FIFOFAIR,控制池內(nèi)部作業(yè)的執(zhí)行情況。
  • weight:控制池相對于其它池的權(quán)重。默認(rèn)為1。如果給某個池的權(quán)重是2,那么這個池會得到其它池兩倍的資源。
  • minShare:除了權(quán)重,還可以為每個池分配最小資源量(例如CPU核心數(shù))。

池屬性通過XML文件設(shè)置,模板在conf/fairscheduler.xml.template中。需要新建fairscheduler.xml文件并放置到類路徑中,或者通過spark.scheduler.allocation.file屬性指定文件路徑:

conf.set("spark.scheduler.allocation.file", "/path/to/file")

以下是XML文件格式:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容