前方高能減速慢行!
在上一篇RDD結(jié)構(gòu)已經(jīng)介紹完了。雖然RDD結(jié)構(gòu)是spark設(shè)計思想最重要的組成,但是沒有輔助的功能只有結(jié)構(gòu)又不能獨立使用。真正使RDD完成計算優(yōu)化的,就是今天我們要講到的spark RDD的另一個重要組成部分RDD算子。
一、RDD算子的定義
我給RDD算子的定義是:用來生成或處理RDD的方法叫做RDD算子。RDD算子就是一些方法,在Spark框架中起到運算符的作用。算子用來構(gòu)建RDD及數(shù)據(jù)之間的關(guān)系。數(shù)據(jù)可以由算子轉(zhuǎn)換成RDD,也可以由RDD產(chǎn)生新RDD,或者將RDD持久化到磁盤或內(nèi)存。
從技術(shù)角度講RDD算子可能比較枯燥,我們舉個里生活學(xué)習(xí)中的例子來類比RDD算子的作用。
完成計算需要什么呢?
需要數(shù)據(jù)載體和運算方式。數(shù)據(jù)載體可以是數(shù)字,數(shù)組,集合,分區(qū),矩陣等。一個普通的計算器,它的運算單位是數(shù)字,而運算符號是加減乘除,這樣就可以得到結(jié)果并輸出了。一個矩陣通過加減乘除也可以得到結(jié)果,但是結(jié)果跟計算器的加減乘除一樣嗎?非也!


所以說加減乘除在不同的計算框架作用是不同的,而加減乘除這樣的符號就是運算方式。在spark計算框架有自己的運算單位(RDD)和自己的運算符(RDD算子)。
是不是很抽象?下面來點具體的。
二、RDD算子的使用
Spark算子非常豐富,有幾十個,開發(fā)者把算子組合使用,從一個基礎(chǔ)的RDD計算出想要的結(jié)果。并且算子是優(yōu)化Spark計算框架的主要依據(jù)。
我們以top算子舉例,rdd.top(n)獲取RDD的前n個排序后的結(jié)果。
例如計算:文件a的2倍與文件b的TOP 3結(jié)果。

- 窄依賴優(yōu)化:如圖中的RDD1,2,3在Stage3中被優(yōu)化為RDD1到RDD3直接計算。是否可以直接計算是由算子的寬窄依賴決定,推薦使用數(shù)據(jù)流向區(qū)分寬窄依賴: partiton流向子RDD的多個partiton屬于寬依賴,父RDD的partiton流向子RDD一個partiton或多個partiton流向一個子RDD的partiton屬于窄依賴。上圖中的RDD3和RDD4做top(3)操作,top是先排序后取出前3個值,排序過程屬于寬依賴,spark計算過程是逆向的DAG(DAG和拓?fù)渑判蛳乱黄榻B),RDD5不能直接計算,必須等待依賴的RDD完成計算,我把這種算子叫做不可優(yōu)化算子(計算流程不可優(yōu)化,必須等待父RDD的完成),Action算子(后文講解)都是不可優(yōu)化算子,Transformation算子也有很多不可優(yōu)化的算子(寬依賴算子),如:groupbykey,reducebykey,cogroup,join等。
- 數(shù)據(jù)量優(yōu)化:上圖中的a文件數(shù)據(jù)乘2,為什么前面有一個filter,假設(shè)filter過濾后的數(shù)據(jù)減少到三分之一,那么對后續(xù)RDD和shuffle的操作優(yōu)化可想而知。而這只是提供一個思路,并不是說有的過濾都是高效的。
- 利用存儲算子優(yōu)化Lineage:RDD算子中除了save(輸出結(jié)果)算子之外,還有幾個比較特別的算子,用來保存中間結(jié)果的,如:persist,cache 和 checkpoint ,當(dāng)RDD的數(shù)據(jù)保持不變并被復(fù)用多次的時候可以用它們臨時保存計算結(jié)果。
1). cache和persist
修改當(dāng)前RDD的存儲方案StorageLevel,默認(rèn)狀態(tài)下與persist級別是一樣的MEMORY_ONLY級別,保存到內(nèi)存,內(nèi)存不足選擇磁盤。
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
這2個方法都不會觸發(fā)任務(wù),只是修改了RDD的存儲方案,當(dāng)RDD被執(zhí)行的時候按照方案存儲到相應(yīng)位置。而checkpoint會單獨執(zhí)行一個job,并把數(shù)據(jù)寫入磁盤。
注:不要把RDD cache和Dataframe cache混淆。Dataframe cache將在spark sql中介紹。
2).checkpoint
檢查RDD是否被物化或計算,一般在程序運行比較長或者計算量大的情況下,需要進(jìn)行Checkpoint。這樣可以避免在運行中出現(xiàn)異常導(dǎo)致RDD回溯代價過大的問題。Checkpoint會把數(shù)據(jù)寫在本地磁盤上。Checkpoint的數(shù)據(jù)可以被同一session的多個job共用。
三、RDD算子之間的關(guān)系
算子從否觸發(fā)job的角度劃分,可以分為Transformation算子和Action算子,Transformation算子不會產(chǎn)生job,是惰性算子,只記錄該算子產(chǎn)生的RDD及父RDD的partiton之間的關(guān)系,而Action算子將觸發(fā)job,完成依賴關(guān)系的所有計算操作。
那么如果一個程序里有多個action算子怎么辦?
順序完成action操作,每個action算子產(chǎn)生一個job,上一job的結(jié)果轉(zhuǎn)換成RDD,繼續(xù)給后續(xù)的action使用。多數(shù)action返回結(jié)果都不是RDD,而transformation算子的返回結(jié)果都是RDD,但可能是多個RDD(如:randomSplit,將一個RDD切分成多個RDD)。
一張圖了解所有RDD算子之間的關(guān)系

上圖劃分為4個大塊,從上到下我們順序講起:
- 圖中的RDD dependency正是RDD結(jié)構(gòu)中的private var deps: Seq[Dependency[_]],dependency類被兩個類繼承,NarrowDependency(窄依賴)和ShuffleDependency(寬依賴)。窄依賴又分onetoonedependency和rangedependency,這是窄依賴提供的2種抽樣方式1對1數(shù)據(jù)抽樣和平衡數(shù)據(jù)抽樣,返回值都是一個partitonid的list集合。
- 第二層,是提供RDD底層計算的基本算法,繼承了RDD,并實現(xiàn)了dependency的一種或多種依賴關(guān)系的計算邏輯,并互相調(diào)用實現(xiàn)更復(fù)雜的功能。
- 最下層是Spark API,利用RDD基本的計算實現(xiàn)RDD所有的算子,并調(diào)用多個底層RDD算子實現(xiàn)復(fù)雜的功能。
- 右邊的泛型,是scala的一種類型,可以理解為類的泛型,泛指編譯時被抽象的類型。Spark利用scala的這一特性把依賴關(guān)系抽象成一種泛型結(jié)構(gòu),并不需要真實的數(shù)據(jù)類型參與編譯過程。編譯的結(jié)構(gòu)類由序列化和反序列化到集群的計算節(jié)點取數(shù)并計算。
Transformation:轉(zhuǎn)換算子,這類轉(zhuǎn)換并不觸發(fā)提交作業(yè),完成作業(yè)中間過程處理。Transformation按照數(shù)據(jù)類型又分為兩種,value數(shù)據(jù)類型算子和key-value數(shù)據(jù)類型算子。
1) Value數(shù)據(jù)類型的Transformation算子
Map,flatMap,mapPartitions,glom,union,cartesian,groupBy,filter,distinct,subtract,sample,takeSample
2)Key-Value數(shù)據(jù)類型的Transfromation算子
mapValues,combineByKey,reduceByKey,partitionBy,cogroup,join,leftOuterJoin和rightOuterJoin
Action: 行動算子,這類算子會觸發(fā)SparkContext提交Job作業(yè)。Action算子是用來整合和輸出數(shù)據(jù)的,主要包括以下幾種:
Foreach,HDFS,saveAsTextFile,saveAsObjectFile, collect,collectAsMap,reduceByKeyLocally,lookup,count,top,reduce,fold,aggregate
注:上述舉例算子可能不全,隨著spark的更新也會不斷有新的算子加入其中。