1.自動進行內(nèi)存和磁盤存儲的切換
spark會優(yōu)先將數(shù)據(jù)存儲在內(nèi)存中,如果內(nèi)存放不下,才把數(shù)據(jù)寫入磁盤,不但能計算內(nèi)存中的數(shù)據(jù),也能計算內(nèi)存放不下的數(shù)據(jù)。
2.基于Lineage(血統(tǒng))高容錯機制
Lineage是基于spark的依賴關(guān)系來完成,每個操作只關(guān)聯(lián)父操作,各分片之間的數(shù)據(jù)互不影響,出現(xiàn)錯誤的時候只需要恢復單個的split特定部分。
常規(guī)容錯方式有兩種:
數(shù)據(jù)檢查點
通過數(shù)據(jù)中心的網(wǎng)絡連接各臺機器,如果發(fā)生checkPoint的時候就需要復制數(shù)據(jù),復制是要通過網(wǎng)絡傳輸?shù)模虼司W(wǎng)絡寬帶是分布式的瓶頸,對存儲的資源也是很大的消耗。
記錄數(shù)據(jù)的更新
當有數(shù)據(jù)更新的時候,就需要記錄數(shù)據(jù),這種方式不需要復制數(shù)據(jù)集。
- RDD是不可變的且lazy的
- RDD的寫操作是粗粒度的、讀操作可以是粗粒度,也可以是細粒度。
3.Task失敗會進行特定次數(shù)的重試
默認重試次數(shù)是4次。TaskSchedulimpl的源碼如下:
def this(sc: SparkContext) = {
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}
private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)
4.Stage失敗,會自動進行特定次數(shù)的重試
Stage可以跟蹤多個StageInfo(存儲SparkListener監(jiān)聽到的所有Stage信息,將Stage信息傳遞給Listeners或web UI)。重試默認次數(shù)是4次,且可以直接運行計算失敗的階段,只計算失敗的數(shù)據(jù)分片,具體Stage源碼如下:
private[spark] object DAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
// Number of consecutive stage attempts allowed before a stage is aborted
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}
5.checkpoint和persist(檢查點和持久化),可以主動或被動觸發(fā)
checkpoint是對RDD進行的標記,會產(chǎn)生一系列的文件,且所有父依賴都會被刪除,是整個依賴的終點。checkpoint是lazy級別的。
persist后,RDD的每個分片會保存在內(nèi)存或磁盤中,下一次使用相同RDD進行其他action計算的時候,就可以重用。
6.數(shù)據(jù)調(diào)度彈性、DAGSchedule、TaskSchedule調(diào)度和資源調(diào)度無關(guān)
spark講執(zhí)行模型抽象成有向無環(huán)圖(Stage),各個Stage之間可以串行或這并行,從而不需要把Stage的中間結(jié)果輸出到HDFS中,當節(jié)點發(fā)生故障時,其他節(jié)點可以替代該節(jié)點運行。