轉(zhuǎn)載請注明 : [過把火] http://www.itdecent.cn/p/29d17aa23116
序
一直都沒有很系統(tǒng)地閱讀過RDD的原始論文,最近翻出來研讀一遍,并作此記錄。
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
總
閱讀完之后,唯一的感覺就是---RDD(彈性分布式數(shù)據(jù)集)完全是伯克利的博士起的一個(gè)很抽象的名字罷了,換句話說就是為了發(fā)論文而起的一個(gè)高大上的名字。但是這并不妨礙RDD的思想依然是diaodiao的。
動(dòng)機(jī)
當(dāng)前很多分布式計(jì)算框架無法實(shí)現(xiàn)高效的迭代式計(jì)算以及交互式數(shù)據(jù)挖掘,包括Hadoop!,首先為了解決高效這個(gè)問題,RDD提出基于內(nèi)存的迭代思想,直接鄙視了Hadoop要不斷進(jìn)行磁盤Spill的弊端;其次,為了保證大數(shù)據(jù)場景下迭代計(jì)算的正常運(yùn)轉(zhuǎn),RDD自身具有高容錯(cuò)快恢復(fù)的特點(diǎn)。
背景及意義
1、Hadoop?
Hadoop為分布式大規(guī)模數(shù)據(jù)的計(jì)算而生,但別忘了,Hadoop依托于HDFS,在沒有與Spark進(jìn)行對比之前,可能并不會(huì)刻意去思考為何需要HDFS,可能這個(gè)問題很容易回答,不就是一個(gè)存儲(chǔ)倉庫嘛,ok,就先這樣認(rèn)為。
2、RDD與Spark的關(guān)系?
Spark就是RDD的具體實(shí)現(xiàn)。
3、Spark VS Hadoop
相同數(shù)據(jù)集的重復(fù)利用的這種特性在很多領(lǐng)域或算法中常見,例如機(jī)器學(xué)習(xí),就是對同一數(shù)據(jù)集不斷進(jìn)行收斂或是梯度下降,不管什么方式,對象都是同一數(shù)據(jù)分片。那么這種業(yè)務(wù)丟給Hadoop-MR來做的話,無非就是每計(jì)算完一次,Spill到HDFS進(jìn)行多副本存儲(chǔ),單就這個(gè)多副本存儲(chǔ)來講,數(shù)據(jù)量一大,IO及磁盤的負(fù)擔(dān)將會(huì)成為整個(gè)業(yè)務(wù)流程的瓶頸,換句話說,做一次PageRank,加入設(shè)定10次收斂,那么MR整體會(huì)寫9次,這還不算,寫完后還得讀出來繼續(xù)迭代,我的天,互聯(lián)網(wǎng)發(fā)展到如今地步,數(shù)不清的網(wǎng)頁,數(shù)不清的圖節(jié)點(diǎn),照這樣進(jìn)行rank,可想而知背后需要多好的磁盤性能來做保障,當(dāng)然,運(yùn)維人員有飯吃那是肯定的。
OK,RDD概念一出,使得這些業(yè)務(wù)邏輯無需每次都得先保存后計(jì)算,而是將數(shù)據(jù)顯式地存儲(chǔ)在內(nèi)存中進(jìn)行迭代,同時(shí)允許用戶控制數(shù)據(jù)分區(qū)。這些還都不是最大的特點(diǎn),RDD最大的特點(diǎn)是能夠通過記錄數(shù)據(jù)集的一些列轉(zhuǎn)換方式來執(zhí)行這些task,這樣一來,某一分片若是丟失,則可以從該RDD的記錄中去就近恢復(fù)該分片,而不是從頭執(zhí)行!最后一點(diǎn),RDD中所記錄的這些所謂的轉(zhuǎn)換方式其實(shí)就是該RDD的誕生方式,也稱作是“血緣”。因此,這種方式在不用大規(guī)模保存副本的同時(shí)也能夠有很好地容錯(cuò)表現(xiàn)。
RDD---Resilient Distributed Datasets
1 RDD概念
1、RDD是一個(gè)只讀的、有分區(qū)的分布式數(shù)據(jù)集。其分類主要有兩種:transformations和action。這兩種RDD負(fù)責(zé)不同的業(yè)務(wù)。transformations負(fù)責(zé)數(shù)據(jù)分片的轉(zhuǎn)換,而action負(fù)責(zé)激活整個(gè)計(jì)算鏈條的實(shí)際計(jì)算。
2、RDD運(yùn)轉(zhuǎn)方式
RDD只需知道自己是怎么誕生的就可以了,這就是RDD的實(shí)際工作方式。
2 RDD與傳統(tǒng)DSM對比
RDD作為對內(nèi)存的抽象,與其相類似的就是分布式共享內(nèi)存年系統(tǒng)(DSM)。
RDDs 只能通過粗粒度的轉(zhuǎn)換被創(chuàng)建(或者被寫) , 然而 DSM 允許對每一個(gè)內(nèi)存位置進(jìn)行讀寫, 這個(gè)是 RDDs 和 DSM 最主要的區(qū)別. 這樣使都 RDDs在 應(yīng)用中大量寫數(shù)據(jù)受到了限制, 但是可以使的容錯(cuò)變的更加高效. 特別是, RDDs 不需要發(fā)生非常耗時(shí)的 checkpoint 操作, 因?yàn)樗梢愿鶕?jù) lineage 進(jìn)行恢復(fù)數(shù)據(jù) . 而且, 只有丟掉了數(shù)據(jù)的分區(qū)才會(huì)需要重新計(jì)算, 并不需要回滾整個(gè)程序, 并且這些重新計(jì)算的任務(wù)是在多臺(tái)機(jī)器上并行運(yùn)算的.
RDDs 的第二個(gè)好處是:它不變的特性使的它可以和 MapReduce 一樣來運(yùn)行執(zhí)行很慢任務(wù)的備份任務(wù)來達(dá)到緩解計(jì)算很慢的節(jié)點(diǎn)的問題. 在 DSM 中, 備份任務(wù)是很難實(shí)現(xiàn)的, 因?yàn)樵既蝿?wù)和備份任務(wù)或同時(shí)更新訪問同一個(gè)內(nèi)存地址和接口.
最后, RDDs 比 DSM 多提供了兩個(gè)好處. 第一, 在對 RDDs 進(jìn)行大量寫操作的過程中, 我們可以根據(jù)數(shù)據(jù)的本地性來調(diào)度 task 以提高性能. 第二, 如果在 scan-base 的操作中, 且這個(gè)時(shí)候內(nèi)存不足以存儲(chǔ)這個(gè) RDDs, 那么 RDDs 可以慢慢的從內(nèi)存中清理掉. 在內(nèi)存中存儲(chǔ)不下的分區(qū)數(shù)據(jù)會(huì)被寫到磁盤中, 且提供了和現(xiàn)有并行數(shù)據(jù)處理系統(tǒng)相同的性能保證.
3 RDD的表達(dá)
RDD的內(nèi)容究竟該包括哪些內(nèi)容才能達(dá)到輕松跟蹤RDD迭代狀態(tài)以及應(yīng)對各種業(yè)務(wù)邏輯的目的呢?理論上講,RDD的transformation類型算子越多,則代表RDD能夠應(yīng)對的場景就越多,而不同的transformation算子能夠由用于任意結(jié)合則會(huì)將極大提升其應(yīng)用場景的范圍。
Spark中RDD的設(shè)計(jì)其客觀表達(dá)是基于DAG圖的形式,圖中的每個(gè)節(jié)點(diǎn)表達(dá)相互獨(dú)立的每個(gè)RDD,而RDD中的編程實(shí)現(xiàn)主要包含的就是5種信息,或叫做5種接口。以下五個(gè)信息可以表達(dá) RDDs: 一個(gè)分區(qū)列表, 每一個(gè)分區(qū)就是數(shù)據(jù)集的原子塊. 一個(gè)父親 RDDs 的依賴列表. 一個(gè)計(jì)算父親的數(shù)據(jù)集的函數(shù). 分區(qū)模式的元數(shù)據(jù)信息以及數(shù)據(jù)存儲(chǔ)信息. 比如, 基于一個(gè) HDFS 文件創(chuàng)建出來的的 RDD 中文件的每一個(gè)數(shù)據(jù)塊就是一個(gè)分區(qū), 并且這個(gè) RDD 知道每一個(gè)數(shù)據(jù)塊存儲(chǔ)在哪些機(jī)器上, 同時(shí), 在這個(gè) RDD 上進(jìn)行 map 操作后的結(jié)果有相同的分區(qū)數(shù), 當(dāng)計(jì)算元素的時(shí)候, 將 map 函數(shù)應(yīng)用到父親 RDD 數(shù)據(jù)中的.

那么,我們定義了每個(gè)RDD需要實(shí)現(xiàn)的接口后,需要考慮的就是如何定義不同RDD之間的依賴關(guān)系!當(dāng)然,對于一個(gè)完整通用的系統(tǒng)而言,需要找到具有普適性的定義方式。RDD的圖表達(dá)中引入兩種依賴關(guān)系:1)寬依賴 。2)窄依賴。
寬依賴:表示父親 RDDs 的一個(gè)分區(qū)可以被子 RDDs 的多個(gè)子分區(qū)所依賴
窄依賴:表示父親 RDDs 的一個(gè)分區(qū)最多被子 RDDs 一個(gè)分區(qū)所依賴
舉例:map 操作是一個(gè)窄依賴, join 操作是一個(gè)寬依賴操作。
下圖是論文中的一個(gè)圖例,其中藍(lán)色實(shí)心矩形表示分區(qū),一個(gè)大的空心矩形代表一個(gè)RDD。

為何要分為這兩種依賴關(guān)系?
第一,:
窄依賴允許所有的父節(jié)點(diǎn)分區(qū)能夠在一臺(tái)節(jié)點(diǎn)中完成計(jì)算。例如可以將每一個(gè)元素進(jìn)行 map 操作后緊接著執(zhí)行filter 操作, 與此相反, 寬依賴需要父親 RDDs 的所有分區(qū)數(shù)據(jù)在不同的節(jié)點(diǎn)之間進(jìn)行重新洗牌和網(wǎng)絡(luò)傳輸類似于MR。
第二:
窄依賴從一個(gè)失敗節(jié)點(diǎn)中恢復(fù)是非常高效的, 因?yàn)橹恍枰匦掠?jì)算相對應(yīng)的父親的分區(qū)數(shù)據(jù)就可以, 而且這個(gè)重新計(jì)算是在不同的節(jié)點(diǎn)進(jìn)行并行重計(jì)算的, 與此相反, 在一個(gè)含有寬依賴的血緣關(guān)系 RDDs 圖中, 一個(gè)節(jié)點(diǎn)的失敗可能導(dǎo)致一些分區(qū)數(shù)據(jù)的丟失, 但是只用重新計(jì)算父 RDD 的所有分區(qū)的數(shù)據(jù)。
4 Job調(diào)度
RDD的延遲性執(zhí)行使得其能夠?qū)崿F(xiàn)交互式執(zhí)行,舉個(gè)例子,你可以在shell窗口中寫一堆transformation的代碼,但是此時(shí)代碼不會(huì)執(zhí)行,你還可以繼續(xù)寫下去,直到你滿意為止,而且中間你可以通過重寫相同變量名的不同方法來覆蓋更新一些變量(RDD),直到最后你使用了一個(gè)action算子后,整個(gè)代碼塊才會(huì)執(zhí)行。這就是交互式操作。
那么整個(gè)代碼塊被激活后JOB是如何調(diào)度的呢?
當(dāng)一個(gè)用戶對某個(gè) RDD 調(diào)用了 action 操作(比如 count 或者 save )的時(shí)候調(diào)度器會(huì)檢查這個(gè) RDD 的血緣關(guān)系圖, 然后根據(jù)這個(gè)血緣關(guān)系圖構(gòu)建一個(gè)含有 stages 的有向無環(huán)圖( DAG ), 最后按照步驟執(zhí)行這個(gè) DAG 中的 stages , 如下圖所示。每一個(gè) stage 包含了盡可能多的帶有窄依賴的 transformations 操作. 這個(gè) stage 的劃分是根據(jù)需要 shuffle 操作的寬依賴或者任何可以不依賴父節(jié)點(diǎn)的RDD, 然后調(diào)度器可以調(diào)度啟動(dòng) tasks 來執(zhí)行父Stage未被執(zhí)行的Stage,一直計(jì)算到最終的RDD。
上圖中空心矩形表示 RDDs ,藍(lán)色的實(shí)心方形表示分區(qū), 黑色的是表示這個(gè)分區(qū)的數(shù)據(jù)存儲(chǔ)在內(nèi)存中, 最后對 RDD-G 調(diào)用 action 操作,。根據(jù)寬依賴生成很多 stages , 且將窄依賴的 transformations 操作放在 stage 中。
調(diào)度器在分配 tasks 的時(shí)候是采用延遲調(diào)度來達(dá)到數(shù)據(jù)本地性的目的(說白了, 就是數(shù)據(jù)在哪里, 計(jì)算就在哪里). 如果某個(gè)分區(qū)的數(shù)據(jù)在某個(gè)節(jié)點(diǎn)上的內(nèi)存中, 那么將這個(gè)分區(qū)的計(jì)算發(fā)送到這個(gè)機(jī)器節(jié)點(diǎn)中. 如果某個(gè) RDD 為它的某個(gè)分區(qū)提供了這個(gè)數(shù)據(jù)存儲(chǔ)的位置節(jié)點(diǎn), 則將這個(gè)分區(qū)的計(jì)算發(fā)送到這個(gè)節(jié)點(diǎn)上.
對于寬依賴(比如 shuffle 依賴), 中間數(shù)據(jù)會(huì)寫入到節(jié)點(diǎn)的磁盤中以利于從錯(cuò)誤中恢復(fù), 這個(gè)和 MapReduce 將 map 后的結(jié)果寫入到磁盤中是很相似的.
5 RDD的內(nèi)存管理
既然RDD基于內(nèi)存迭代,那么內(nèi)存資源需要一定的管理方式使其更高效地被利用。
目前Spark支持三種RDD存儲(chǔ)介質(zhì):1)完全內(nèi)存中的非序列化jvm對象。2)內(nèi)存中的序列化數(shù)據(jù)。3)持久化在磁盤。當(dāng)然,第一種方式是最好的,因?yàn)橛?jì)算速度最快,但是大多時(shí)候內(nèi)存容不下這么多RDD,則會(huì)使用第三種。
內(nèi)存中的RDD怎樣被回收來釋放資源呢?Spark中采用LRU的回收方式,即:如果新的RDD無法被內(nèi)存容納,則內(nèi)存中會(huì)啟動(dòng)LRU策略來將最近很少使用的RDD進(jìn)行清除。
6 容錯(cuò)的更高保證:checkpointing
如果迭代鏈條十分長,那么有必要適當(dāng)進(jìn)行checkpoint來緩存一些中間結(jié)果來保證計(jì)算鏈條的可靠性。
PageRank是一種需要重復(fù)很多次迭代的算法,且真實(shí)場景下該算法適配的數(shù)據(jù)集很大,因此很有必要進(jìn)行checkpoint。checkpoint對于一些有較高存儲(chǔ)保障的RDD來講并沒有用,例如textfile,大多都是從HDFS讀取的原始數(shù)據(jù)。
