Spark蓋中蓋(一篇頂五篇)-1 RDD的數(shù)據(jù)結(jié)構(gòu)模型

前言:自Google發(fā)表三大論文GFS、MapReduce、BigTable以來(lái),衍生出的開(kāi)源框架越來(lái)越多,其中Hadoop更是以高可用、高擴(kuò)展、高容錯(cuò)等特性形成了開(kāi)源工業(yè)界事實(shí)標(biāo)準(zhǔn)。Hadoop是一個(gè)可以搭建在廉價(jià)PC上的分布式集群生態(tài)體系,用戶(hù)可以在不清楚底層運(yùn)行細(xì)節(jié)的情況下,開(kāi)發(fā)出自己的分布式應(yīng)用。但是Hadoop MapReduce由于其設(shè)計(jì)初衷并不是為了滿(mǎn)足循環(huán)式數(shù)據(jù)流處理,因此在多并行運(yùn)行的數(shù)據(jù)可復(fù)用場(chǎng)景(如:迭代式機(jī)器學(xué)習(xí)、交互式數(shù)據(jù)處理)中存在諸多的問(wèn)題,所以Spark應(yīng)運(yùn)而生。Spark在傳統(tǒng)的Mapreduce計(jì)算框架的基礎(chǔ)上,將計(jì)算單元縮小到更適合并行計(jì)算和重復(fù)使用的RDD。

一、 什么是RDD?
二、 RDD是怎樣形成的?
三、 RDD結(jié)構(gòu)拆分及源碼解讀

一、 什么是RDD?
RDD(Resilient Distributed Datasets),是Spark最為核心的概念。
從字面上,直譯為彈性分布式數(shù)據(jù)集。所謂“彈性”,一種簡(jiǎn)單解釋是指RDD是橫向多分區(qū)的,縱向當(dāng)計(jì)算過(guò)程中內(nèi)存不足時(shí)可刷寫(xiě)到磁盤(pán)等外存上,可與外存做靈活的數(shù)據(jù)交換;而另一種個(gè)人更偏向的解釋是RDD是由虛擬數(shù)據(jù)結(jié)構(gòu)組成,并不包含真實(shí)數(shù)據(jù)本體,RDD使用了一種“血統(tǒng)”的容錯(cuò)機(jī)制,在結(jié)構(gòu)更新和丟失后可隨時(shí)根據(jù)血統(tǒng)進(jìn)行數(shù)據(jù)模型的重建。所謂“分布式”,就是可以分布在多臺(tái)機(jī)器上進(jìn)行并行計(jì)算。
從空間結(jié)構(gòu)上,可以理解為是一組只讀的、可分區(qū)的分布式數(shù)據(jù)集合,該集合內(nèi)包含了多個(gè)分區(qū)。分區(qū)就是依照特定規(guī)則,將具有相同屬性的數(shù)據(jù)記錄放在一起。每個(gè)分區(qū)相當(dāng)于一個(gè)數(shù)據(jù)集片段。下圖簡(jiǎn)單表示了一個(gè)RDD的結(jié)構(gòu):

Paste_Image.png

RDD是一個(gè)只讀的有屬性的數(shù)據(jù)集。屬性用來(lái)描述當(dāng)前數(shù)據(jù)集的狀態(tài),數(shù)據(jù)集是由數(shù)據(jù)的分區(qū)(partition)組成,并(由block)映射成真實(shí)數(shù)據(jù)。RDD屬性包括名稱(chēng)、分區(qū)類(lèi)型、父RDD指針、數(shù)據(jù)本地化、數(shù)據(jù)依賴(lài)關(guān)系等,主要屬性可以分為3類(lèi):

  1. 與其他RDD 的關(guān)系(parents)
  2. 數(shù)據(jù)(partitioner,checkpoint,storagelevel,iterator)
  3. RDD自身屬性(rddname,sparkcontext,sparkconf)
    之后RDD源碼解析里詳細(xì)介紹每個(gè)屬性。

二、 RDD是怎樣形成的?
有需要在Spark內(nèi)計(jì)算的數(shù)據(jù)即形成RDD,所以開(kāi)始輸入到Spark的數(shù)據(jù)和經(jīng)過(guò)Spark算子(下文介紹算子)計(jì)算過(guò)的數(shù)據(jù)都會(huì)形成RDD,包括即將輸出的數(shù)據(jù)也會(huì)生成RDD后統(tǒng)一輸出的。如圖。

RDD生成

關(guān)于RDD的形成, 主要是通過(guò)連接物理存儲(chǔ)輸入的數(shù)據(jù)集和在已有RDD基礎(chǔ)上進(jìn)行相關(guān)計(jì)算操作衍生的。下面我們就通過(guò)一個(gè)大數(shù)據(jù)開(kāi)源生態(tài)經(jīng)典的例子(Wordcount)來(lái)描述下RDD的產(chǎn)生過(guò)程。強(qiáng)大的Scala代碼如下。

Paste_Image.png

初識(shí)的小伙伴們會(huì)感覺(jué)很神奇,四行代碼就全部搞定了嗎,之前的MR代碼可是碼了一大堆呢……的確如此想學(xué)好Spark的小伙伴們,還是要掌握Scala這門(mén)語(yǔ)言,廢話(huà)不多說(shuō),簡(jiǎn)單解釋下這幾行代碼:
第一行,從HDFS上讀取in.txt文件,創(chuàng)建了第一個(gè)RDD
第二行,按空格分詞,扁平化處理,生成第二個(gè)RDD,每個(gè)詞計(jì)數(shù)為1,生成了第三個(gè)RDD。這里可能有人會(huì)問(wèn),為什么生成了兩個(gè)RDD呢,因?yàn)榇诵写aRDD經(jīng)過(guò)了兩次算子轉(zhuǎn)換(transformation)操作。關(guān)于算子這里不多詳述,請(qǐng)關(guān)注下期文章。
第三行,按每個(gè)詞分組,累加求和,生成第四個(gè)RDD
第四行,將Wordcount統(tǒng)計(jì)結(jié)果輸出到HDFS
整個(gè)產(chǎn)生過(guò)程如下圖所示:

Paste_Image.png

RDD的依賴(lài)關(guān)系
通過(guò)上文的例子可以了解到,一個(gè)作業(yè)從開(kāi)始到結(jié)束的計(jì)算過(guò)程中產(chǎn)生了多個(gè)RDD,RDD之間是彼此相互依賴(lài)的,我們把這種父子依賴(lài)的關(guān)系,稱(chēng)之為“血統(tǒng)”。如果父RDD的每個(gè)分區(qū)最多只能被子RDD的一個(gè)分區(qū)使用,我們稱(chēng)之為(narrow dependency)窄依賴(lài);若一個(gè)父RDD的每個(gè)分區(qū)可以被子RDD的多個(gè)分區(qū)使用,我們稱(chēng)之為(wide dependency)寬依賴(lài)。簡(jiǎn)單來(lái)講窄依賴(lài)就是父子RDD分區(qū)間”一對(duì)一“的關(guān)系,寬依賴(lài)就是”一對(duì)多“關(guān)系,具體理解可參考下圖:

Paste_Image.png

那么為什么Spark要將依賴(lài)分成這兩種呢,下面我們就了解下原因:
首先,從計(jì)算過(guò)程來(lái)看,窄依賴(lài)是數(shù)據(jù)以管道方式經(jīng)一系列計(jì)算操作可以運(yùn)行在了一個(gè)集群節(jié)點(diǎn)上,如(map、filter等),寬依賴(lài)則可能需要將數(shù)據(jù)通過(guò)跨節(jié)點(diǎn)傳遞后運(yùn)行(如groupByKey),有點(diǎn)類(lèi)似于MR的shuffle過(guò)程。
其次,從失敗恢復(fù)來(lái)看,窄依賴(lài)的失敗恢復(fù)起來(lái)更高效,因?yàn)樗恍枵业礁窻DD的一個(gè)對(duì)應(yīng)分區(qū)即可,而且可以在不同節(jié)點(diǎn)上并行計(jì)算做恢復(fù);寬依賴(lài)則牽涉到父RDD的多個(gè)分區(qū),恢復(fù)起來(lái)相對(duì)復(fù)雜些。
綜上, 這里引入了一個(gè)新的概念Stage。Stage可以簡(jiǎn)單理解為是由一組RDD組成的可進(jìn)行優(yōu)化的執(zhí)行計(jì)劃。如果RDD的衍生關(guān)系都是窄依賴(lài),則可放在同一個(gè)Stage中運(yùn)行,若RDD的依賴(lài)關(guān)系為寬依賴(lài),則要?jiǎng)澐值讲煌腟tage。這樣Spark在執(zhí)行作業(yè)時(shí),會(huì)按照Stage的劃分, 生成一個(gè)完整的最優(yōu)的執(zhí)行計(jì)劃。下面引用一張比較流行的圖片輔助大家理解Stage,如圖RDD?-A到RDD-B和RDD-F到RDD-G均屬于寬依賴(lài),所以與前面的父RDD劃分到了不同的Stage中。

Paste_Image.png

三、 RDD結(jié)構(gòu)拆分及源碼解讀
到這里,相信大家已經(jīng)對(duì)RDD有了大體的了解,但要詳細(xì)了解RDD的內(nèi)部結(jié)構(gòu),請(qǐng)繼續(xù)耐心往下看。先貼一張RDD的內(nèi)部結(jié)構(gòu)圖

Paste_Image.png

RDD 的屬性主要包括(rddname、sparkcontext、sparkconf、parent、dependency、partitioner、checkpoint、storageLevel),下面我們先簡(jiǎn)單逐一了解下:

1.rddname
即rdd的名稱(chēng)
2.sparkcontext
SparkContext為Spark job的入口,由Spark driver創(chuàng)建在client端,包括集群連接,RddID,創(chuàng)建抽樣,累加器,廣播變量等信息。
3.sparkconf配置信息,即sc.conf
Spark參數(shù)配置信息
提供三個(gè)位置用來(lái)配置系統(tǒng):
Spark api:控制大部分的應(yīng)用程序參數(shù),可以用SparkConf對(duì)象或者Java系統(tǒng)屬性設(shè)置
環(huán)境變量:可以通過(guò)每個(gè)節(jié)點(diǎn)的conf/spark-env.sh腳本設(shè)置。例如IP地址、端口等信息
日志配置:可以通過(guò)log4j.properties配置

4.parent
指向依賴(lài)父RDD的partition id,利用dependencies方法可以查找該RDD所依賴(lài)的partiton id的List集合,即上圖中的parents。
5.iterator
迭代器,用來(lái)查找當(dāng)前RDD Partition與父RDD中Partition的血緣關(guān)系。并通過(guò)StorageLevel確定迭代位置,直到確定真實(shí)數(shù)據(jù)的位置。迭代方式分為checkpoint迭代和RDD迭代, 如果StorageLevel為NONE則執(zhí)行computeOrReadCheckpoint計(jì)算并獲取數(shù)據(jù),此方法也是一個(gè)迭代器,迭代checkpoint數(shù)據(jù)存放位置,迭代出口為找到真實(shí)數(shù)據(jù)或內(nèi)存。如果Storagelevel不為空,根據(jù)存儲(chǔ)級(jí)別進(jìn)入RDD迭代器,繼續(xù)迭代父RDD的結(jié)構(gòu),迭代出口為真實(shí)數(shù)據(jù)或內(nèi)存。迭代器內(nèi)部有數(shù)據(jù)本地化判斷,先從本地獲取數(shù)據(jù),如果沒(méi)有則遠(yuǎn)程查找。
6.prisist
rdd存儲(chǔ)的level,即通過(guò)storagelevel和是否可覆蓋判斷,
storagelevel分為 5中狀態(tài) ,useDisk, useMemory, useOffHeap, deserialized, replication 可組合使用。

if (useOffHeap) { 
require(!useDisk, "Off-heap storage level does not support using disk") 
require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") }
persist :
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

7.partitioner 分區(qū)方式
RDD的分區(qū)方式。RDD的分區(qū)方式主要包含兩種(Hash和Range),這兩種分區(qū)類(lèi)型都是針對(duì)K-V類(lèi)型的數(shù)據(jù)。如是非K-V類(lèi)型,則分區(qū)為None。 Hash是以key作為分區(qū)條件的散列分布,分區(qū)數(shù)據(jù)不連續(xù),極端情況也可能散列到少數(shù)幾個(gè)分區(qū)上,導(dǎo)致數(shù)據(jù)不均等;Range按Key的排序平衡分布,分區(qū)內(nèi)數(shù)據(jù)連續(xù),大小也相對(duì)均等。
8.checkpoint
Spark提供的一種緩存機(jī)制,當(dāng)需要計(jì)算的RDD過(guò)多時(shí),為了避免重新計(jì)算之前的RDD,可以對(duì)RDD做checkpoint處理,檢查RDD是否被物化或計(jì)算,并將結(jié)果持久化到磁盤(pán)或HDFS。與spark提供的另一種緩存機(jī)制cache相比, cache緩存數(shù)據(jù)由executor管理,當(dāng)executor消失了,被cache的數(shù)據(jù)將被清除,RDD重新計(jì)算,而checkpoint將數(shù)據(jù)保存到磁盤(pán)或HDFS,job可以從checkpoint點(diǎn)繼續(xù)計(jì)算。

def checkpoint(): Unit = RDDCheckpointData.synchronized {
 // NOTE: we use a global lock here due to complexities downstream with ensuring 
// children RDD partitions point to the correct parent partitions. In the future 
// we should revisit this consideration.
 if (context.checkpointDir.isEmpty) { 
throw new SparkException("Checkpoint directory has not been set in the SparkContext") 
} else if (checkpointData.isEmpty) {
    checkpointData = Some(new ReliableRDDCheckpointData(this))
 } 
}

9.storageLevel
一個(gè)枚舉類(lèi)型,用來(lái)記錄RDD的存儲(chǔ)級(jí)別。存儲(chǔ)介質(zhì)主要包括內(nèi)存、磁盤(pán)和堆外內(nèi)存,另外還包含是否序列化操作以及副本數(shù)量。如:MEMORY_AND_DISK_SER代表數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤(pán),并且以序列化的方式存儲(chǔ)。是判斷數(shù)據(jù)是否保存磁盤(pán)或者內(nèi)存的條件。
storagelevel結(jié)構(gòu):

 class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)

綜上所述Spark RDD和Spark RDD算子組成了計(jì)算的基本單位,并由數(shù)據(jù)流向的依賴(lài)關(guān)系形成非循環(huán)數(shù)據(jù)流模型(DAG),形成Spark基礎(chǔ)計(jì)算框架。

spark 蓋中蓋五連載
RDD算子詳解

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容