RDD是容錯、并行的數(shù)據(jù)結構,具備分區(qū)的屬性,這個分區(qū)可以是單機上分區(qū)也可以是多機上的分區(qū),對于RDD分區(qū)的數(shù)量涉及到這個RDD進行并發(fā)計算的粒度。每一個分區(qū)都會在一個單獨的task中執(zhí)行。
可以為其指定分區(qū)個數(shù),如果從hdfs文件創(chuàng)建的RDD,分區(qū)數(shù)和block數(shù)一致,如果從本地文件中創(chuàng)建RDD,默認是機器的cpu個數(shù)。
//不設置分區(qū)數(shù)量
>val rdd = sc.textFile("/home/reducer2/cluster/hadoop/readme.md")
>rdd.partitions.size
res0: INT = 8 //為cpu的個數(shù)
//設置分區(qū)個數(shù)
>val rdd = sc.textFile("/home/reducer2/cluster/hadoop/readme.md",6);
RDD的首選位置 preferredLocations
spark在執(zhí)行任務時,會盡可能的把算子分配到離數(shù)據(jù)最近的節(jié)點上,減少數(shù)據(jù)的網(wǎng)絡IO,當RDD生成的位置就是首選位置,如果是HDFS生成的RDD,那首選位置就是block所在在的節(jié)點。如果是經(jīng)過轉換后的RDD,則算子應該分配到RDD所在的節(jié)點上。
如果一個大文件40G,生成一個RDD,產(chǎn)生與block個數(shù)一樣多的分區(qū),默認一個block為128M,則會產(chǎn)生320個block(partitions),假如共有5臺natanode平均分配了這320個block,每個datanode上至少有64個分區(qū)?,F(xiàn)在要對分區(qū)1進行計算,第一步則需要知道RDD分區(qū)1的首選位置:
val location = rdd.preferredLocations(rdd.dependecies(0))
res1:Set[String] =(192.168.110.1,192.168.110.2,192.168.110.3) //和block的復制數(shù)一致
而在真實的開發(fā)過程中,我們并沒有去關注單個分區(qū)的執(zhí)行的情況,而是類似于:
val rdds = sc.textFile("/.../..");
varl count = rdds.filter(line=>line.contains("error"))
在執(zhí)行filter的時候,實際上就是每個分區(qū)都會執(zhí)行這個函數(shù),最后會生成一個新的RDD,新RDD的分區(qū)情況可能和先前的分區(qū)大致相同,在相同機器節(jié)點上,減少機器間的數(shù)據(jù)重排。