RDD分區(qū)
在分布式程序中,通信的代價是很大的,因此控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡(luò)傳輸可以極大地提升整體性能。所以對RDD進行分區(qū)的目的就是減少網(wǎng)絡(luò)傳輸?shù)拇鷥r以提高系統(tǒng)的性能。
RDD的特性
在講RDD分區(qū)之前,先說一下RDD的特性。
RDD,全稱為Resilient Distributed Datasets,是一個容錯的、并行的數(shù)據(jù)結(jié)構(gòu),可以讓用戶顯式地將數(shù)據(jù)存儲到磁盤和內(nèi)存中,并能控制數(shù)據(jù)的分區(qū)。同時,RDD還提供了一組豐富的操作來操作這些數(shù)據(jù)。在這些操作中,諸如map、flatMap、filter等轉(zhuǎn)換操作實現(xiàn)了monad模式,很好地契合了Scala的集合操作。除此之外,RDD還提供了諸如join、groupBy、reduceByKey等更為方便的操作(注意,reduceByKey是action,而非transformation),以支持常見的數(shù)據(jù)運算。
通常來講,針對數(shù)據(jù)處理有幾種常見模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce采用了MapReduces模型,Storm則采用了Stream Processing模型。RDD混合了這四種模型,使得Spark可以應(yīng)用于各種大數(shù)據(jù)處理場景。
RDD作為數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是一個只讀的分區(qū)記錄集合。一個RDD可以包含多個分區(qū),每個分區(qū)就是一個dataset片段。RDD可以相互依賴。如果RDD的每個分區(qū)最多只能被一個Child RDD的一個分區(qū)使用,則稱之為narrow dependency;若多個Child RDD分區(qū)都可以依賴,則稱之為wide dependency。不同的操作依據(jù)其特性,可能會產(chǎn)生不同的依賴。例如map操作會產(chǎn)生narrow dependency,而join操作則產(chǎn)生wide dependency。
Spark之所以將依賴分為narrow與wide,基于兩點原因。
首先,narrow dependencies可以支持在同一個cluster node上以管道形式執(zhí)行多條命令,例如在執(zhí)行了map后,緊接著執(zhí)行filter。相反,wide dependencies需要所有的父分區(qū)都是可用的,可能還需要調(diào)用類似MapReduce之類的操作進行跨節(jié)點傳遞。
其次,則是從失敗恢復(fù)的角度考慮。narrow dependencies的失敗恢復(fù)更有效,因為它只需要重新計算丟失的parent partition即可,而且可以并行地在不同節(jié)點進行重計算。而wide dependencies牽涉到RDD各級的多個Parent Partitions。下圖說明了narrow dependencies與wide dependencies之間的區(qū)別:

本圖來自Matei Zaharia撰寫的論文An Architecture for Fast and General Data Processing on Large Clusters。圖中,一個box代表一個RDD,一個帶陰影的矩形框代表一個partition。
另外給一張join的操作過程圖片:

RDD分區(qū)
我們分析這樣一個應(yīng)用,它在內(nèi)存中保存著一張很大的用戶信息表(UserData)——也就是一個由(UserId, UserInfo)對組成的RDD,其中UserInfo包含一個該用戶所訂閱的主題列表。該應(yīng)用會周期性性地將這張表與一個小文件進行組合,這個小文件中存著過去五分鐘內(nèi)發(fā)生的事件(events)——其實就是一個由(UserID, LinkInfo)對組成的表。如果我們要進行對用戶訪問情況的統(tǒng)計,就需要對這兩個表進行join操作,以獲得(UserID,UserInfo,LinkInfo)信息。
如圖pic-2默認(rèn)情況下,join操作會將兩個數(shù)據(jù)集中的所有的鍵的哈希值都求出來,將哈希值相同的記錄傳送到同一臺機器上,之后在該機器上對所有鍵相同的記錄進行join操作。
所以這種情況之下,每次進行join都會有數(shù)據(jù)混洗的問題。造成了很大的網(wǎng)絡(luò)傳輸開銷。
這種情況之下由于UserData表比events表要大得多,所以選擇將UserData進行分區(qū)。如果對UserData進行分區(qū),之后Spark就會知曉該RDD是根據(jù)鍵的哈希值來分區(qū)的,這樣在調(diào)用join()時,Spark就會利用這一點。當(dāng)調(diào)用UserData.join(events)時,Spark只會對events進行數(shù)據(jù)混洗操作,將events中特定的UserID的記錄發(fā)送到userData的對應(yīng)分區(qū)所在的那臺機器上。如下圖:

自定義分區(qū)
我們都知道Spark內(nèi)部提供了HashPartitioner和RangePartitioner兩種分區(qū)策略,這兩種分區(qū)策略在很多情況下都適合我們的場景。但是有些情況下,Spark內(nèi)部不能符合咱們的需求,這時候我們就可以自定義分區(qū)策略。為此,Spark提供了相應(yīng)的接口,我們只需要擴展Partitioner抽象類,然后實現(xiàn)里面的三個方法:

def numPartitions: Int:這個方法需要返回你想要創(chuàng)建分區(qū)的個數(shù);
def getPartition(key: Any): Int:這個函數(shù)需要對輸入的key做計算,然后返回該key的分區(qū)ID,范圍一定是0到numPartitions-1;
equals():這個是Java標(biāo)準(zhǔn)的判斷相等的函數(shù),之所以要求用戶實現(xiàn)這個函數(shù)是因為Spark內(nèi)部會比較兩個RDD的分區(qū)是否一樣。
假如我們想把來自同一個域名的URL放到一臺節(jié)點上,比如:http://www.csdn.net和http://www.csdn.net/bolg/,如果你使用HashPartitioner,這兩個URL的Hash值可能不一樣,這就使得這兩個URL被放到不同的節(jié)點上。所以這種情況下我們就需要自定義我們的分區(qū)策略,可以如下實現(xiàn):

因為hashCode值可能為負(fù)數(shù),所以我們需要對他進行處理。然后我們就可以在partitionBy()方法里面使用我們的分區(qū):

類似的,在Java中定義自己的分區(qū)策略和Scala類似,只需要繼承org.apache.spark.Partitioner,并實現(xiàn)其中的方法即可。
在Python中,你不需要擴展Partitioner類,我們只需要對iteblog.partitionBy()加上一個額外的hash函數(shù),如下:
